Re-write kwargs_from_env to handle TLS options better

This more closely matches the way the docker client interprets the
relevant environment variables. Among other things, it's now possible to
set DOCKER_TLS_VERIFY=false.

Signed-off-by: Mike Dougherty <mike.dougherty@docker.com>
This commit is contained in:
Mike Dougherty 2016-02-03 18:59:01 -08:00
parent bba8e28f82
commit 387db11009
3 changed files with 91 additions and 33 deletions

View File

@ -6,6 +6,7 @@ from .ssladapter import ssladapter
class TLSConfig(object):
cert = None
ca_cert = None
verify = None
ssl_version = None
@ -48,27 +49,18 @@ class TLSConfig(object):
)
self.cert = (tls_cert, tls_key)
# Either set verify to True (public/default CA checks) or to the
# path of a CA Cert file.
if verify is not None:
if not ca_cert:
self.verify = verify
elif os.path.isfile(ca_cert):
if not verify:
raise errors.TLSParameterError(
'verify can not be False when a CA cert is'
' provided.'
)
self.verify = ca_cert
else:
raise errors.TLSParameterError(
'Invalid CA certificate provided for `tls_ca_cert`.'
)
# If verify is set, make sure the cert exists
self.verify = verify
self.ca_cert = ca_cert
if self.verify and self.ca_cert and not os.path.isfile(self.ca_cert):
raise errors.TLSParameterError(
'Invalid CA certificate provided for `tls_ca_cert`.'
)
def configure_client(self, client):
client.ssl_version = self.ssl_version
if self.verify is not None:
client.verify = self.verify
client.verify = self.verify
client.ca_cert = self.ca_cert
if self.cert:
client.cert = self.cert
client.mount('https://', ssladapter.SSLAdapter(

View File

@ -445,26 +445,45 @@ def parse_devices(devices):
def kwargs_from_env(ssl_version=None, assert_hostname=None):
host = os.environ.get('DOCKER_HOST')
cert_path = os.environ.get('DOCKER_CERT_PATH')
# empty string for cert path is the same as unset.
cert_path = os.environ.get('DOCKER_CERT_PATH') or None
# empty string for tls verify counts as "false".
# Any value or 'unset' counts as true.
tls_verify = os.environ.get('DOCKER_TLS_VERIFY')
if tls_verify == '':
tls_verify = False
enable_tls = True
else:
tls_verify = tls_verify is not None
enable_tls = cert_path or tls_verify
params = {}
if host:
params['base_url'] = (host.replace('tcp://', 'https://')
if tls_verify else host)
if enable_tls else host)
if tls_verify and not cert_path:
if not enable_tls:
return params
if not cert_path:
cert_path = os.path.join(os.path.expanduser('~'), '.docker')
if tls_verify and cert_path:
params['tls'] = tls.TLSConfig(
client_cert=(os.path.join(cert_path, 'cert.pem'),
os.path.join(cert_path, 'key.pem')),
ca_cert=os.path.join(cert_path, 'ca.pem'),
verify=True,
ssl_version=ssl_version,
assert_hostname=assert_hostname)
if not tls_verify and assert_hostname is None:
# assert_hostname is a subset of TLS verification,
# so if it's not set already then set it to false.
assert_hostname = False
params['tls'] = tls.TLSConfig(
client_cert=(os.path.join(cert_path, 'cert.pem'),
os.path.join(cert_path, 'key.pem')),
ca_cert=os.path.join(cert_path, 'ca.pem'),
verify=tls_verify,
ssl_version=ssl_version,
assert_hostname=assert_hostname,
assert_fingerprint=tls_verify)
return params

View File

@ -165,8 +165,8 @@ class KwargsFromEnvTest(base.BaseTestCase):
def test_kwargs_from_env_empty(self):
os.environ.update(DOCKER_HOST='',
DOCKER_CERT_PATH='',
DOCKER_TLS_VERIFY='')
DOCKER_CERT_PATH='')
os.environ.pop('DOCKER_TLS_VERIFY', None)
kwargs = kwargs_from_env()
self.assertEqual(None, kwargs.get('base_url'))
@ -178,10 +178,11 @@ class KwargsFromEnvTest(base.BaseTestCase):
DOCKER_TLS_VERIFY='1')
kwargs = kwargs_from_env(assert_hostname=False)
self.assertEqual('https://192.168.59.103:2376', kwargs['base_url'])
self.assertTrue('ca.pem' in kwargs['tls'].verify)
self.assertTrue('ca.pem' in kwargs['tls'].ca_cert)
self.assertTrue('cert.pem' in kwargs['tls'].cert[0])
self.assertTrue('key.pem' in kwargs['tls'].cert[1])
self.assertEqual(False, kwargs['tls'].assert_hostname)
self.assertTrue(kwargs['tls'].verify)
try:
client = Client(**kwargs)
self.assertEqual(kwargs['base_url'], client.base_url)
@ -190,6 +191,51 @@ class KwargsFromEnvTest(base.BaseTestCase):
except TypeError as e:
self.fail(e)
def test_kwargs_from_env_tls_verify_false(self):
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='')
kwargs = kwargs_from_env(assert_hostname=True)
self.assertEqual('https://192.168.59.103:2376', kwargs['base_url'])
self.assertTrue('ca.pem' in kwargs['tls'].ca_cert)
self.assertTrue('cert.pem' in kwargs['tls'].cert[0])
self.assertTrue('key.pem' in kwargs['tls'].cert[1])
self.assertEqual(True, kwargs['tls'].assert_hostname)
self.assertEqual(False, kwargs['tls'].verify)
try:
client = Client(**kwargs)
self.assertEqual(kwargs['base_url'], client.base_url)
self.assertEqual(kwargs['tls'].ca_cert, client.ca_cert)
self.assertEqual(kwargs['tls'].cert, client.cert)
self.assertFalse(kwargs['tls'].verify)
except TypeError as e:
self.fail(e)
def test_kwargs_from_env_tls_verify_false_no_cert(self):
temp_dir = tempfile.mkdtemp()
cert_dir = os.path.join(temp_dir, '.docker')
shutil.copytree(TEST_CERT_DIR, cert_dir)
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
HOME=temp_dir,
DOCKER_TLS_VERIFY='')
os.environ.pop('DOCKER_CERT_PATH', None)
kwargs = kwargs_from_env(assert_hostname=True)
self.assertEqual('https://192.168.59.103:2376', kwargs['base_url'])
self.assertTrue('ca.pem' in kwargs['tls'].ca_cert)
self.assertTrue('cert.pem' in kwargs['tls'].cert[0])
self.assertTrue('key.pem' in kwargs['tls'].cert[1])
self.assertEqual(True, kwargs['tls'].assert_hostname)
self.assertEqual(False, kwargs['tls'].verify)
try:
client = Client(**kwargs)
self.assertEqual(kwargs['base_url'], client.base_url)
self.assertEqual(kwargs['tls'].ca_cert, client.ca_cert)
self.assertEqual(kwargs['tls'].cert, client.cert)
self.assertFalse(kwargs['tls'].verify)
except TypeError as e:
self.fail(e)
def test_kwargs_from_env_no_cert_path(self):
try:
temp_dir = tempfile.mkdtemp()
@ -201,7 +247,8 @@ class KwargsFromEnvTest(base.BaseTestCase):
DOCKER_TLS_VERIFY='1')
kwargs = kwargs_from_env()
self.assertIn(cert_dir, kwargs['tls'].verify)
self.assertTrue(kwargs['tls'].verify)
self.assertIn(cert_dir, kwargs['tls'].ca_cert)
self.assertIn(cert_dir, kwargs['tls'].cert[0])
self.assertIn(cert_dir, kwargs['tls'].cert[1])
finally: