import os import os.path import unittest import tempfile from docker.client import Client from docker.constants import DEFAULT_DOCKER_API_VERSION from docker.errors import DockerException from docker.utils import ( parse_repository_tag, parse_host, convert_filters, kwargs_from_env, create_host_config, Ulimit, LogConfig, parse_bytes, parse_env_file ) from docker.utils.ports import build_port_bindings, split_port from docker.auth import resolve_repository_name, resolve_authconfig import base class UtilsTest(base.BaseTestCase): longMessage = True def generate_tempfile(self, file_content=None): """ Generates a temporary file for tests with the content of 'file_content' and returns the filename. Don't forget to unlink the file with os.unlink() after. """ local_tempfile = tempfile.NamedTemporaryFile(delete=False) local_tempfile.write(file_content.encode('UTF-8')) local_tempfile.close() return local_tempfile.name def setUp(self): self.os_environ = os.environ.copy() def tearDown(self): os.environ = self.os_environ def test_parse_repository_tag(self): self.assertEqual(parse_repository_tag("root"), ("root", None)) self.assertEqual(parse_repository_tag("root:tag"), ("root", "tag")) self.assertEqual(parse_repository_tag("user/repo"), ("user/repo", None)) self.assertEqual(parse_repository_tag("user/repo:tag"), ("user/repo", "tag")) self.assertEqual(parse_repository_tag("url:5000/repo"), ("url:5000/repo", None)) self.assertEqual(parse_repository_tag("url:5000/repo:tag"), ("url:5000/repo", "tag")) def test_parse_bytes(self): self.assertEqual(parse_bytes("512MB"), (536870912)) self.assertEqual(parse_bytes("512M"), (536870912)) self.assertRaises(DockerException, parse_bytes, "512MK") self.assertRaises(DockerException, parse_bytes, "512L") def test_parse_host(self): invalid_hosts = [ '0.0.0.0', 'tcp://', 'udp://127.0.0.1', 'udp://127.0.0.1:2375', ] valid_hosts = { '0.0.0.1:5555': 'http://0.0.0.1:5555', ':6666': 'http://127.0.0.1:6666', 'tcp://:7777': 'http://127.0.0.1:7777', 'http://:7777': 'http://127.0.0.1:7777', 'https://kokia.jp:2375': 'https://kokia.jp:2375', '': 'http+unix://var/run/docker.sock', None: 'http+unix://var/run/docker.sock', 'unix:///var/run/docker.sock': 'http+unix:///var/run/docker.sock', 'unix://': 'http+unix://var/run/docker.sock', 'somehost.net:80/service/swarm': ( 'http://somehost.net:80/service/swarm' ), } for host in invalid_hosts: try: parsed = parse_host(host) self.fail('Expected to fail but success: %s -> %s' % ( host, parsed )) except DockerException: pass for host, expected in valid_hosts.items(): self.assertEqual(parse_host(host), expected, msg=host) def test_kwargs_from_env(self): os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376', DOCKER_CERT_PATH=os.path.join( os.path.dirname(__file__), 'testdata/certs'), DOCKER_TLS_VERIFY='1') kwargs = kwargs_from_env(assert_hostname=False) self.assertEqual('https://192.168.59.103:2376', kwargs['base_url']) self.assertTrue('ca.pem' in kwargs['tls'].verify) self.assertTrue('cert.pem' in kwargs['tls'].cert[0]) self.assertTrue('key.pem' in kwargs['tls'].cert[1]) self.assertEqual(False, kwargs['tls'].assert_hostname) try: client = Client(**kwargs) self.assertEqual(kwargs['base_url'], client.base_url) self.assertEqual(kwargs['tls'].verify, client.verify) self.assertEqual(kwargs['tls'].cert, client.cert) except TypeError as e: self.fail(e) def test_parse_env_file_proper(self): env_file = self.generate_tempfile( file_content='USER=jdoe\nPASS=secret') get_parse_env_file = parse_env_file(env_file) self.assertEqual(get_parse_env_file, {'USER': 'jdoe', 'PASS': 'secret'}) os.unlink(env_file) def test_parse_env_file_commented_line(self): env_file = self.generate_tempfile( file_content='USER=jdoe\n#PASS=secret') get_parse_env_file = parse_env_file((env_file)) self.assertEqual(get_parse_env_file, {'USER': 'jdoe'}) os.unlink(env_file) def test_parse_env_file_invalid_line(self): env_file = self.generate_tempfile( file_content='USER jdoe') self.assertRaises( DockerException, parse_env_file, env_file) os.unlink(env_file) def test_convert_filters(self): tests = [ ({'dangling': True}, '{"dangling": ["true"]}'), ({'dangling': "true"}, '{"dangling": ["true"]}'), ({'exited': 0}, '{"exited": [0]}'), ({'exited': [0, 1]}, '{"exited": [0, 1]}'), ] for filters, expected in tests: self.assertEqual(convert_filters(filters), expected) def test_create_host_config_no_options(self): config = create_host_config(version='1.19') self.assertFalse('NetworkMode' in config) def test_create_host_config_no_options_newer_api_version(self): config = create_host_config(version='1.20') self.assertEqual(config['NetworkMode'], 'default') def test_create_host_config_dict_ulimit(self): ulimit_dct = {'name': 'nofile', 'soft': 8096} config = create_host_config( ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION ) self.assertIn('Ulimits', config) self.assertEqual(len(config['Ulimits']), 1) ulimit_obj = config['Ulimits'][0] self.assertTrue(isinstance(ulimit_obj, Ulimit)) self.assertEqual(ulimit_obj.name, ulimit_dct['name']) self.assertEqual(ulimit_obj.soft, ulimit_dct['soft']) self.assertEqual(ulimit_obj['Soft'], ulimit_obj.soft) def test_create_host_config_dict_ulimit_capitals(self): ulimit_dct = {'Name': 'nofile', 'Soft': 8096, 'Hard': 8096 * 4} config = create_host_config( ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION ) self.assertIn('Ulimits', config) self.assertEqual(len(config['Ulimits']), 1) ulimit_obj = config['Ulimits'][0] self.assertTrue(isinstance(ulimit_obj, Ulimit)) self.assertEqual(ulimit_obj.name, ulimit_dct['Name']) self.assertEqual(ulimit_obj.soft, ulimit_dct['Soft']) self.assertEqual(ulimit_obj.hard, ulimit_dct['Hard']) self.assertEqual(ulimit_obj['Soft'], ulimit_obj.soft) def test_create_host_config_obj_ulimit(self): ulimit_dct = Ulimit(name='nofile', soft=8096) config = create_host_config( ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION ) self.assertIn('Ulimits', config) self.assertEqual(len(config['Ulimits']), 1) ulimit_obj = config['Ulimits'][0] self.assertTrue(isinstance(ulimit_obj, Ulimit)) self.assertEqual(ulimit_obj, ulimit_dct) def test_ulimit_invalid_type(self): self.assertRaises(ValueError, lambda: Ulimit(name=None)) self.assertRaises(ValueError, lambda: Ulimit(name='hello', soft='123')) self.assertRaises(ValueError, lambda: Ulimit(name='hello', hard='456')) def test_create_host_config_dict_logconfig(self): dct = {'type': LogConfig.types.SYSLOG, 'config': {'key1': 'val1'}} config = create_host_config( log_config=dct, version=DEFAULT_DOCKER_API_VERSION ) self.assertIn('LogConfig', config) self.assertTrue(isinstance(config['LogConfig'], LogConfig)) self.assertEqual(dct['type'], config['LogConfig'].type) def test_create_host_config_obj_logconfig(self): obj = LogConfig(type=LogConfig.types.SYSLOG, config={'key1': 'val1'}) config = create_host_config( log_config=obj, version=DEFAULT_DOCKER_API_VERSION ) self.assertIn('LogConfig', config) self.assertTrue(isinstance(config['LogConfig'], LogConfig)) self.assertEqual(obj, config['LogConfig']) def test_logconfig_invalid_type(self): self.assertRaises(ValueError, lambda: LogConfig(type='xxx', config={})) self.assertRaises(ValueError, lambda: LogConfig( type=LogConfig.types.JSON, config='helloworld' )) def test_resolve_repository_name(self): # docker hub library image self.assertEqual( resolve_repository_name('image'), ('index.docker.io', 'image'), ) # docker hub image self.assertEqual( resolve_repository_name('username/image'), ('index.docker.io', 'username/image'), ) # private registry self.assertEqual( resolve_repository_name('my.registry.net/image'), ('my.registry.net', 'image'), ) # private registry with port self.assertEqual( resolve_repository_name('my.registry.net:5000/image'), ('my.registry.net:5000', 'image'), ) # private registry with username self.assertEqual( resolve_repository_name('my.registry.net/username/image'), ('my.registry.net', 'username/image'), ) # no dots but port self.assertEqual( resolve_repository_name('hostname:5000/image'), ('hostname:5000', 'image'), ) # no dots but port and username self.assertEqual( resolve_repository_name('hostname:5000/username/image'), ('hostname:5000', 'username/image'), ) # localhost self.assertEqual( resolve_repository_name('localhost/image'), ('localhost', 'image'), ) # localhost with username self.assertEqual( resolve_repository_name('localhost/username/image'), ('localhost', 'username/image'), ) def test_resolve_authconfig(self): auth_config = { 'https://index.docker.io/v1/': {'auth': 'indexuser'}, 'my.registry.net': {'auth': 'privateuser'}, 'http://legacy.registry.url/v1/': {'auth': 'legacyauth'} } # hostname only self.assertEqual( resolve_authconfig(auth_config, 'my.registry.net'), {'auth': 'privateuser'} ) # no protocol self.assertEqual( resolve_authconfig(auth_config, 'my.registry.net/v1/'), {'auth': 'privateuser'} ) # no path self.assertEqual( resolve_authconfig(auth_config, 'http://my.registry.net'), {'auth': 'privateuser'} ) # no path, trailing slash self.assertEqual( resolve_authconfig(auth_config, 'http://my.registry.net/'), {'auth': 'privateuser'} ) # no path, wrong secure protocol self.assertEqual( resolve_authconfig(auth_config, 'https://my.registry.net'), {'auth': 'privateuser'} ) # no path, wrong insecure protocol self.assertEqual( resolve_authconfig(auth_config, 'http://index.docker.io'), {'auth': 'indexuser'} ) # with path, wrong protocol self.assertEqual( resolve_authconfig(auth_config, 'https://my.registry.net/v1/'), {'auth': 'privateuser'} ) # default registry self.assertEqual( resolve_authconfig(auth_config), {'auth': 'indexuser'} ) # default registry (explicit None) self.assertEqual( resolve_authconfig(auth_config, None), {'auth': 'indexuser'} ) # fully explicit self.assertEqual( resolve_authconfig(auth_config, 'http://my.registry.net/v1/'), {'auth': 'privateuser'} ) # legacy entry in config self.assertEqual( resolve_authconfig(auth_config, 'legacy.registry.url'), {'auth': 'legacyauth'} ) # no matching entry self.assertTrue( resolve_authconfig(auth_config, 'does.not.exist') is None ) def test_resolve_registry_and_auth(self): auth_config = { 'https://index.docker.io/v1/': {'auth': 'indexuser'}, 'my.registry.net': {'auth': 'privateuser'}, } # library image image = 'image' self.assertEqual( resolve_authconfig(auth_config, resolve_repository_name(image)[0]), {'auth': 'indexuser'}, ) # docker hub image image = 'username/image' self.assertEqual( resolve_authconfig(auth_config, resolve_repository_name(image)[0]), {'auth': 'indexuser'}, ) # private registry image = 'my.registry.net/image' self.assertEqual( resolve_authconfig(auth_config, resolve_repository_name(image)[0]), {'auth': 'privateuser'}, ) # unauthenticated registry image = 'other.registry.net/image' self.assertEqual( resolve_authconfig(auth_config, resolve_repository_name(image)[0]), None, ) def test_split_port_with_host_ip(self): internal_port, external_port = split_port("127.0.0.1:1000:2000") self.assertEqual(internal_port, ["2000"]) self.assertEqual(external_port, [("127.0.0.1", "1000")]) def test_split_port_with_protocol(self): internal_port, external_port = split_port("127.0.0.1:1000:2000/udp") self.assertEqual(internal_port, ["2000/udp"]) self.assertEqual(external_port, [("127.0.0.1", "1000")]) def test_split_port_with_host_ip_no_port(self): internal_port, external_port = split_port("127.0.0.1::2000") self.assertEqual(internal_port, ["2000"]) self.assertEqual(external_port, [("127.0.0.1", None)]) def test_split_port_range_with_host_ip_no_port(self): internal_port, external_port = split_port("127.0.0.1::2000-2001") self.assertEqual(internal_port, ["2000", "2001"]) self.assertEqual(external_port, [("127.0.0.1", None), ("127.0.0.1", None)]) def test_split_port_with_host_port(self): internal_port, external_port = split_port("1000:2000") self.assertEqual(internal_port, ["2000"]) self.assertEqual(external_port, ["1000"]) def test_split_port_range_with_host_port(self): internal_port, external_port = split_port("1000-1001:2000-2001") self.assertEqual(internal_port, ["2000", "2001"]) self.assertEqual(external_port, ["1000", "1001"]) def test_split_port_no_host_port(self): internal_port, external_port = split_port("2000") self.assertEqual(internal_port, ["2000"]) self.assertEqual(external_port, None) def test_split_port_range_no_host_port(self): internal_port, external_port = split_port("2000-2001") self.assertEqual(internal_port, ["2000", "2001"]) self.assertEqual(external_port, None) def test_split_port_range_with_protocol(self): internal_port, external_port = split_port( "127.0.0.1:1000-1001:2000-2001/udp") self.assertEqual(internal_port, ["2000/udp", "2001/udp"]) self.assertEqual(external_port, [("127.0.0.1", "1000"), ("127.0.0.1", "1001")]) def test_split_port_invalid(self): self.assertRaises(ValueError, lambda: split_port("0.0.0.0:1000:2000:tcp")) def test_non_matching_length_port_ranges(self): self.assertRaises( ValueError, lambda: split_port("0.0.0.0:1000-1010:2000-2002/tcp") ) def test_port_and_range_invalid(self): self.assertRaises(ValueError, lambda: split_port("0.0.0.0:1000:2000-2002/tcp")) def test_port_only_with_colon(self): self.assertRaises(ValueError, lambda: split_port(":80")) def test_host_only_with_colon(self): self.assertRaises(ValueError, lambda: split_port("localhost:")) def test_build_port_bindings_with_one_port(self): port_bindings = build_port_bindings(["127.0.0.1:1000:1000"]) self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")]) def test_build_port_bindings_with_matching_internal_ports(self): port_bindings = build_port_bindings( ["127.0.0.1:1000:1000", "127.0.0.1:2000:1000"]) self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000"), ("127.0.0.1", "2000")]) def test_build_port_bindings_with_nonmatching_internal_ports(self): port_bindings = build_port_bindings( ["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"]) self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")]) self.assertEqual(port_bindings["2000"], [("127.0.0.1", "2000")]) def test_build_port_bindings_with_port_range(self): port_bindings = build_port_bindings(["127.0.0.1:1000-1001:1000-1001"]) self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")]) self.assertEqual(port_bindings["1001"], [("127.0.0.1", "1001")]) def test_build_port_bindings_with_matching_internal_port_ranges(self): port_bindings = build_port_bindings( ["127.0.0.1:1000-1001:1000-1001", "127.0.0.1:2000-2001:1000-1001"]) self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000"), ("127.0.0.1", "2000")]) self.assertEqual(port_bindings["1001"], [("127.0.0.1", "1001"), ("127.0.0.1", "2001")]) def test_build_port_bindings_with_nonmatching_internal_port_ranges(self): port_bindings = build_port_bindings( ["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"]) self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")]) self.assertEqual(port_bindings["2000"], [("127.0.0.1", "2000")]) if __name__ == '__main__': unittest.main()