diff --git a/tests/utils_test.py b/tests/utils_test.py index b67ac4ec..45929f73 100644 --- a/tests/utils_test.py +++ b/tests/utils_test.py @@ -25,6 +25,159 @@ TEST_CERT_DIR = os.path.join( ) +class HostConfigTest(base.BaseTestCase): + def test_create_host_config_no_options(self): + config = create_host_config(version='1.19') + self.assertFalse('NetworkMode' in config) + + def test_create_host_config_no_options_newer_api_version(self): + config = create_host_config(version='1.20') + self.assertEqual(config['NetworkMode'], 'default') + + def test_create_host_config_invalid_cpu_cfs_types(self): + with pytest.raises(TypeError): + create_host_config(version='1.20', cpu_quota='0') + + with pytest.raises(TypeError): + create_host_config(version='1.20', cpu_period='0') + + with pytest.raises(TypeError): + create_host_config(version='1.20', cpu_quota=23.11) + + with pytest.raises(TypeError): + create_host_config(version='1.20', cpu_period=1999.0) + + def test_create_host_config_with_cpu_quota(self): + config = create_host_config(version='1.20', cpu_quota=1999) + self.assertEqual(config.get('CpuQuota'), 1999) + + def test_create_host_config_with_cpu_period(self): + config = create_host_config(version='1.20', cpu_period=1999) + self.assertEqual(config.get('CpuPeriod'), 1999) + + +class UlimitTest(base.BaseTestCase): + def test_create_host_config_dict_ulimit(self): + ulimit_dct = {'name': 'nofile', 'soft': 8096} + config = create_host_config( + ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION + ) + self.assertIn('Ulimits', config) + self.assertEqual(len(config['Ulimits']), 1) + ulimit_obj = config['Ulimits'][0] + self.assertTrue(isinstance(ulimit_obj, Ulimit)) + self.assertEqual(ulimit_obj.name, ulimit_dct['name']) + self.assertEqual(ulimit_obj.soft, ulimit_dct['soft']) + self.assertEqual(ulimit_obj['Soft'], ulimit_obj.soft) + + def test_create_host_config_dict_ulimit_capitals(self): + ulimit_dct = {'Name': 'nofile', 'Soft': 8096, 'Hard': 8096 * 4} + config = create_host_config( + ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION + ) + self.assertIn('Ulimits', config) + self.assertEqual(len(config['Ulimits']), 1) + ulimit_obj = config['Ulimits'][0] + self.assertTrue(isinstance(ulimit_obj, Ulimit)) + self.assertEqual(ulimit_obj.name, ulimit_dct['Name']) + self.assertEqual(ulimit_obj.soft, ulimit_dct['Soft']) + self.assertEqual(ulimit_obj.hard, ulimit_dct['Hard']) + self.assertEqual(ulimit_obj['Soft'], ulimit_obj.soft) + + def test_create_host_config_obj_ulimit(self): + ulimit_dct = Ulimit(name='nofile', soft=8096) + config = create_host_config( + ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION + ) + self.assertIn('Ulimits', config) + self.assertEqual(len(config['Ulimits']), 1) + ulimit_obj = config['Ulimits'][0] + self.assertTrue(isinstance(ulimit_obj, Ulimit)) + self.assertEqual(ulimit_obj, ulimit_dct) + + def test_ulimit_invalid_type(self): + self.assertRaises(ValueError, lambda: Ulimit(name=None)) + self.assertRaises(ValueError, lambda: Ulimit(name='hello', soft='123')) + self.assertRaises(ValueError, lambda: Ulimit(name='hello', hard='456')) + + +class LogConfigTest(base.BaseTestCase): + def test_create_host_config_dict_logconfig(self): + dct = {'type': LogConfig.types.SYSLOG, 'config': {'key1': 'val1'}} + config = create_host_config( + version=DEFAULT_DOCKER_API_VERSION, log_config=dct + ) + self.assertIn('LogConfig', config) + self.assertTrue(isinstance(config['LogConfig'], LogConfig)) + self.assertEqual(dct['type'], config['LogConfig'].type) + + def test_create_host_config_obj_logconfig(self): + obj = LogConfig(type=LogConfig.types.SYSLOG, config={'key1': 'val1'}) + config = create_host_config( + version=DEFAULT_DOCKER_API_VERSION, log_config=obj + ) + self.assertIn('LogConfig', config) + self.assertTrue(isinstance(config['LogConfig'], LogConfig)) + self.assertEqual(obj, config['LogConfig']) + + def test_logconfig_invalid_config_type(self): + with pytest.raises(ValueError): + LogConfig(type=LogConfig.types.JSON, config='helloworld') + + +class KwargsFromEnvTest(base.BaseTestCase): + def setUp(self): + self.os_environ = os.environ.copy() + + def tearDown(self): + os.environ = self.os_environ + + def test_kwargs_from_env_empty(self): + os.environ.update(DOCKER_HOST='', + DOCKER_CERT_PATH='', + DOCKER_TLS_VERIFY='') + + kwargs = kwargs_from_env() + self.assertEqual(None, kwargs.get('base_url')) + self.assertEqual(None, kwargs.get('tls')) + + def test_kwargs_from_env_tls(self): + os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376', + DOCKER_CERT_PATH=TEST_CERT_DIR, + DOCKER_TLS_VERIFY='1') + kwargs = kwargs_from_env(assert_hostname=False) + self.assertEqual('https://192.168.59.103:2376', kwargs['base_url']) + self.assertTrue('ca.pem' in kwargs['tls'].verify) + self.assertTrue('cert.pem' in kwargs['tls'].cert[0]) + self.assertTrue('key.pem' in kwargs['tls'].cert[1]) + self.assertEqual(False, kwargs['tls'].assert_hostname) + try: + client = Client(**kwargs) + self.assertEqual(kwargs['base_url'], client.base_url) + self.assertEqual(kwargs['tls'].verify, client.verify) + self.assertEqual(kwargs['tls'].cert, client.cert) + except TypeError as e: + self.fail(e) + + def test_kwargs_from_env_no_cert_path(self): + try: + temp_dir = tempfile.mkdtemp() + cert_dir = os.path.join(temp_dir, '.docker') + shutil.copytree(TEST_CERT_DIR, cert_dir) + + os.environ.update(HOME=temp_dir, + DOCKER_CERT_PATH='', + DOCKER_TLS_VERIFY='1') + + kwargs = kwargs_from_env() + self.assertIn(cert_dir, kwargs['tls'].verify) + self.assertIn(cert_dir, kwargs['tls'].cert[0]) + self.assertIn(cert_dir, kwargs['tls'].cert[1]) + finally: + if temp_dir: + shutil.rmtree(temp_dir) + + class UtilsTest(base.BaseTestCase): longMessage = True @@ -39,12 +192,6 @@ class UtilsTest(base.BaseTestCase): local_tempfile.close() return local_tempfile.name - def setUp(self): - self.os_environ = os.environ.copy() - - def tearDown(self): - os.environ = self.os_environ - def test_parse_repository_tag(self): self.assertEqual(parse_repository_tag("root"), ("root", None)) @@ -103,51 +250,6 @@ class UtilsTest(base.BaseTestCase): assert parse_host(val, 'win32') == tcp_port - def test_kwargs_from_env_empty(self): - os.environ.update(DOCKER_HOST='', - DOCKER_CERT_PATH='', - DOCKER_TLS_VERIFY='') - - kwargs = kwargs_from_env() - self.assertEqual(None, kwargs.get('base_url')) - self.assertEqual(None, kwargs.get('tls')) - - def test_kwargs_from_env_tls(self): - os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376', - DOCKER_CERT_PATH=TEST_CERT_DIR, - DOCKER_TLS_VERIFY='1') - kwargs = kwargs_from_env(assert_hostname=False) - self.assertEqual('https://192.168.59.103:2376', kwargs['base_url']) - self.assertTrue('ca.pem' in kwargs['tls'].verify) - self.assertTrue('cert.pem' in kwargs['tls'].cert[0]) - self.assertTrue('key.pem' in kwargs['tls'].cert[1]) - self.assertEqual(False, kwargs['tls'].assert_hostname) - try: - client = Client(**kwargs) - self.assertEqual(kwargs['base_url'], client.base_url) - self.assertEqual(kwargs['tls'].verify, client.verify) - self.assertEqual(kwargs['tls'].cert, client.cert) - except TypeError as e: - self.fail(e) - - def test_kwargs_from_env_no_cert_path(self): - try: - temp_dir = tempfile.mkdtemp() - cert_dir = os.path.join(temp_dir, '.docker') - shutil.copytree(TEST_CERT_DIR, cert_dir) - - os.environ.update(HOME=temp_dir, - DOCKER_CERT_PATH='', - DOCKER_TLS_VERIFY='1') - - kwargs = kwargs_from_env() - self.assertIn(cert_dir, kwargs['tls'].verify) - self.assertIn(cert_dir, kwargs['tls'].cert[0]) - self.assertIn(cert_dir, kwargs['tls'].cert[1]) - finally: - if temp_dir: - shutil.rmtree(temp_dir) - def test_parse_env_file_proper(self): env_file = self.generate_tempfile( file_content='USER=jdoe\nPASS=secret') @@ -181,79 +283,6 @@ class UtilsTest(base.BaseTestCase): for filters, expected in tests: self.assertEqual(convert_filters(filters), expected) - def test_create_host_config_no_options(self): - config = create_host_config(version='1.19') - self.assertFalse('NetworkMode' in config) - - def test_create_host_config_no_options_newer_api_version(self): - config = create_host_config(version='1.20') - self.assertEqual(config['NetworkMode'], 'default') - - def test_create_host_config_dict_ulimit(self): - ulimit_dct = {'name': 'nofile', 'soft': 8096} - config = create_host_config( - ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION - ) - self.assertIn('Ulimits', config) - self.assertEqual(len(config['Ulimits']), 1) - ulimit_obj = config['Ulimits'][0] - self.assertTrue(isinstance(ulimit_obj, Ulimit)) - self.assertEqual(ulimit_obj.name, ulimit_dct['name']) - self.assertEqual(ulimit_obj.soft, ulimit_dct['soft']) - self.assertEqual(ulimit_obj['Soft'], ulimit_obj.soft) - - def test_create_host_config_dict_ulimit_capitals(self): - ulimit_dct = {'Name': 'nofile', 'Soft': 8096, 'Hard': 8096 * 4} - config = create_host_config( - ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION - ) - self.assertIn('Ulimits', config) - self.assertEqual(len(config['Ulimits']), 1) - ulimit_obj = config['Ulimits'][0] - self.assertTrue(isinstance(ulimit_obj, Ulimit)) - self.assertEqual(ulimit_obj.name, ulimit_dct['Name']) - self.assertEqual(ulimit_obj.soft, ulimit_dct['Soft']) - self.assertEqual(ulimit_obj.hard, ulimit_dct['Hard']) - self.assertEqual(ulimit_obj['Soft'], ulimit_obj.soft) - - def test_create_host_config_obj_ulimit(self): - ulimit_dct = Ulimit(name='nofile', soft=8096) - config = create_host_config( - ulimits=[ulimit_dct], version=DEFAULT_DOCKER_API_VERSION - ) - self.assertIn('Ulimits', config) - self.assertEqual(len(config['Ulimits']), 1) - ulimit_obj = config['Ulimits'][0] - self.assertTrue(isinstance(ulimit_obj, Ulimit)) - self.assertEqual(ulimit_obj, ulimit_dct) - - def test_ulimit_invalid_type(self): - self.assertRaises(ValueError, lambda: Ulimit(name=None)) - self.assertRaises(ValueError, lambda: Ulimit(name='hello', soft='123')) - self.assertRaises(ValueError, lambda: Ulimit(name='hello', hard='456')) - - def test_create_host_config_dict_logconfig(self): - dct = {'type': LogConfig.types.SYSLOG, 'config': {'key1': 'val1'}} - config = create_host_config( - version=DEFAULT_DOCKER_API_VERSION, log_config=dct - ) - self.assertIn('LogConfig', config) - self.assertTrue(isinstance(config['LogConfig'], LogConfig)) - self.assertEqual(dct['type'], config['LogConfig'].type) - - def test_create_host_config_obj_logconfig(self): - obj = LogConfig(type=LogConfig.types.SYSLOG, config={'key1': 'val1'}) - config = create_host_config( - version=DEFAULT_DOCKER_API_VERSION, log_config=obj - ) - self.assertIn('LogConfig', config) - self.assertTrue(isinstance(config['LogConfig'], LogConfig)) - self.assertEqual(obj, config['LogConfig']) - - def test_logconfig_invalid_config_type(self): - with pytest.raises(ValueError): - LogConfig(type=LogConfig.types.JSON, config='helloworld') - def test_resolve_repository_name(self): # docker hub library image self.assertEqual( @@ -407,6 +436,8 @@ class UtilsTest(base.BaseTestCase): None, ) + +class PortsTest(base.BaseTestCase): def test_split_port_with_host_ip(self): internal_port, external_port = split_port("127.0.0.1:1000:2000") self.assertEqual(internal_port, ["2000"])