mirror of https://github.com/docker/docker-py.git
Merge pull request #120 from mpetazzoni/improve-auth
Rework auth config loading and use of auth data for login/pull/push
This commit is contained in:
commit
2ac5cca84d
|
@ -13,6 +13,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import base64
|
||||
import fileinput
|
||||
import json
|
||||
import os
|
||||
|
||||
|
@ -21,6 +22,7 @@ import six
|
|||
import docker.utils as utils
|
||||
|
||||
INDEX_URL = 'https://index.docker.io/v1/'
|
||||
DOCKER_CONFIG_FILENAME = '.dockercfg'
|
||||
|
||||
|
||||
def swap_protocol(url):
|
||||
|
@ -59,12 +61,15 @@ def resolve_repository_name(repo_name):
|
|||
return expand_registry_url(parts[0]), parts[1]
|
||||
|
||||
|
||||
def resolve_authconfig(authconfig, registry):
|
||||
default = {}
|
||||
if registry == INDEX_URL or registry == '':
|
||||
# default to the index server
|
||||
return authconfig['Configs'].get(INDEX_URL, default)
|
||||
# if its not the index server there are three cases:
|
||||
def resolve_authconfig(authconfig, registry=None):
|
||||
"""Return the authentication data from the given auth configuration for a
|
||||
specific registry. We'll do our best to infer the correct URL for the
|
||||
registry, trying both http and https schemes. Returns an empty dictionnary
|
||||
if no data exists."""
|
||||
# Default to the public index server
|
||||
registry = registry or INDEX_URL
|
||||
|
||||
# Ff its not the index server there are three cases:
|
||||
#
|
||||
# 1. this is a full config url -> it should be used as is
|
||||
# 2. it could be a full url, but with the wrong protocol
|
||||
|
@ -77,11 +82,9 @@ def resolve_authconfig(authconfig, registry):
|
|||
if not registry.startswith('http:') and not registry.startswith('https:'):
|
||||
registry = 'https://' + registry
|
||||
|
||||
if registry in authconfig['Configs']:
|
||||
return authconfig['Configs'][registry]
|
||||
elif swap_protocol(registry) in authconfig['Configs']:
|
||||
return authconfig['Configs'][swap_protocol(registry)]
|
||||
return default
|
||||
if registry in authconfig:
|
||||
return authconfig[registry]
|
||||
return authconfig.get(swap_protocol(registry), None)
|
||||
|
||||
|
||||
def decode_auth(auth):
|
||||
|
@ -98,38 +101,53 @@ def encode_header(auth):
|
|||
|
||||
|
||||
def load_config(root=None):
|
||||
root = root or os.environ['HOME']
|
||||
config = {
|
||||
'Configs': {},
|
||||
'rootPath': root
|
||||
}
|
||||
"""Loads authentication data from a Docker configuration file in the given
|
||||
root directory."""
|
||||
conf = {}
|
||||
data = None
|
||||
|
||||
config_file = os.path.join(root, '.dockercfg')
|
||||
if not os.path.exists(config_file):
|
||||
return config
|
||||
config_file = os.path.join(root or os.environ.get('HOME', '.'),
|
||||
DOCKER_CONFIG_FILENAME)
|
||||
|
||||
f = open(config_file)
|
||||
# First try as JSON
|
||||
try:
|
||||
config['Configs'] = json.load(f)
|
||||
for k, conf in six.iteritems(config['Configs']):
|
||||
conf['Username'], conf['Password'] = decode_auth(conf['auth'])
|
||||
del conf['auth']
|
||||
config['Configs'][k] = conf
|
||||
except Exception:
|
||||
f.seek(0)
|
||||
buf = []
|
||||
for line in f:
|
||||
k, v = line.split(' = ')
|
||||
buf.append(v)
|
||||
if len(buf) < 2:
|
||||
raise Exception("The Auth config file is empty")
|
||||
user, pwd = decode_auth(buf[0])
|
||||
config['Configs'][INDEX_URL] = {
|
||||
'Username': user,
|
||||
'Password': pwd,
|
||||
'Email': buf[1]
|
||||
}
|
||||
finally:
|
||||
f.close()
|
||||
with open(config_file) as f:
|
||||
conf = {}
|
||||
for registry, entry in six.iteritems(json.load(f)):
|
||||
username, password = decode_auth(entry['auth'])
|
||||
conf[registry] = {
|
||||
'username': username,
|
||||
'password': password,
|
||||
'email': entry['email'],
|
||||
'serveraddress': registry,
|
||||
}
|
||||
return conf
|
||||
except:
|
||||
pass
|
||||
|
||||
return config
|
||||
# If that fails, we assume the configuration file contains a single
|
||||
# authentication token for the public registry in the following format:
|
||||
#
|
||||
# auth = AUTH_TOKEN
|
||||
# email = email@domain.com
|
||||
try:
|
||||
data = []
|
||||
for line in fileinput.input(config_file):
|
||||
data.append(line.strip().split(' = ')[1])
|
||||
if len(data) < 2:
|
||||
# Not enough data
|
||||
raise Exception('Invalid or empty configuration file!')
|
||||
|
||||
username, password = decode_auth(data[0])
|
||||
conf[INDEX_URL] = {
|
||||
'username': username,
|
||||
'password': password,
|
||||
'email': data[1],
|
||||
'serveraddress': INDEX_URL,
|
||||
}
|
||||
return conf
|
||||
except:
|
||||
pass
|
||||
|
||||
# If all fails, return an empty config
|
||||
return {}
|
||||
|
|
|
@ -75,12 +75,9 @@ class Client(requests.Session):
|
|||
self.base_url = base_url
|
||||
self._version = version
|
||||
self._timeout = timeout
|
||||
self._auth_configs = auth.load_config()
|
||||
|
||||
self.mount('unix://', unixconn.UnixAdapter(base_url, timeout))
|
||||
try:
|
||||
self._cfg = auth.load_config()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _set_request_timeout(self, kwargs):
|
||||
"""Prepare the kwargs for an HTTP request by inserting the timeout
|
||||
|
@ -516,24 +513,33 @@ class Client(requests.Session):
|
|||
|
||||
self._raise_for_status(res)
|
||||
|
||||
def login(self, username, password=None, email=None, registry=None):
|
||||
url = self._url("/auth")
|
||||
if registry is None:
|
||||
registry = auth.INDEX_URL
|
||||
if getattr(self, '_cfg', None) is None:
|
||||
self._cfg = auth.load_config()
|
||||
authcfg = auth.resolve_authconfig(self._cfg, registry)
|
||||
if 'username' in authcfg and authcfg['username'] == username:
|
||||
def login(self, username, password=None, email=None, registry=None,
|
||||
reauth=False):
|
||||
# If we don't have any auth data so far, try reloading the config file
|
||||
# one more time in case anything showed up in there.
|
||||
if not self._auth_configs:
|
||||
self._auth_configs = auth.load_config()
|
||||
|
||||
registry = registry or auth.INDEX_URL
|
||||
|
||||
authcfg = auth.resolve_authconfig(self._auth_configs, registry)
|
||||
# If we found an existing auth config for this registry and username
|
||||
# combination, we can return it immediately unless reauth is requested.
|
||||
if authcfg and authcfg.get('username', None) == username \
|
||||
and not reauth:
|
||||
return authcfg
|
||||
|
||||
req_data = {
|
||||
'username': username,
|
||||
'password': password,
|
||||
'email': email
|
||||
'email': email,
|
||||
'serveraddress': registry,
|
||||
}
|
||||
res = self._result(self._post_json(url, data=req_data), True)
|
||||
if res['Status'] == 'Login Succeeded':
|
||||
self._cfg['Configs'][registry] = req_data
|
||||
return res
|
||||
|
||||
response = self._post_json(self._url('/auth'), data=req_data)
|
||||
if response.status_code == 200:
|
||||
self._auth_configs[registry] = req_data
|
||||
return self._result(response, json=True)
|
||||
|
||||
def logs(self, container, stdout=True, stderr=True, stream=False):
|
||||
if isinstance(container, dict):
|
||||
|
@ -582,16 +588,20 @@ class Client(requests.Session):
|
|||
headers = {}
|
||||
|
||||
if utils.compare_version('1.5', self._version) >= 0:
|
||||
if getattr(self, '_cfg', None) is None:
|
||||
self._cfg = auth.load_config()
|
||||
authcfg = auth.resolve_authconfig(self._cfg, registry)
|
||||
# do not fail if no atuhentication exists
|
||||
# for this specific registry as we can have a readonly pull
|
||||
# If we don't have any auth data so far, try reloading the config
|
||||
# file one more time in case anything showed up in there.
|
||||
if not self._auth_configs:
|
||||
self._auth_configs = auth.load_config()
|
||||
authcfg = auth.resolve_authconfig(self._auth_configs, registry)
|
||||
|
||||
# Do not fail here if no atuhentication exists for this specific
|
||||
# registry as we can have a readonly pull. Just put the header if
|
||||
# we can.
|
||||
if authcfg:
|
||||
headers['X-Registry-Auth'] = auth.encode_header(authcfg)
|
||||
u = self._url("/images/create")
|
||||
response = self._post(u, params=params, headers=headers, stream=stream,
|
||||
timeout=None)
|
||||
|
||||
response = self._post(self._url('/images/create'), params=params,
|
||||
headers=headers, stream=stream, timeout=None)
|
||||
|
||||
if stream:
|
||||
return self._stream_helper(response)
|
||||
|
@ -602,26 +612,26 @@ class Client(requests.Session):
|
|||
registry, repo_name = auth.resolve_repository_name(repository)
|
||||
u = self._url("/images/{0}/push".format(repository))
|
||||
headers = {}
|
||||
if getattr(self, '_cfg', None) is None:
|
||||
self._cfg = auth.load_config()
|
||||
authcfg = auth.resolve_authconfig(self._cfg, registry)
|
||||
|
||||
if utils.compare_version('1.5', self._version) >= 0:
|
||||
# do not fail if no atuhentication exists
|
||||
# for this specific registry as we can have an anon push
|
||||
# If we don't have any auth data so far, try reloading the config
|
||||
# file one more time in case anything showed up in there.
|
||||
if not self._auth_configs:
|
||||
self._auth_configs = auth.load_config()
|
||||
authcfg = auth.resolve_authconfig(self._auth_configs, registry)
|
||||
|
||||
# Do not fail here if no atuhentication exists for this specific
|
||||
# registry as we can have a readonly pull. Just put the header if
|
||||
# we can.
|
||||
if authcfg:
|
||||
headers['X-Registry-Auth'] = auth.encode_header(authcfg)
|
||||
|
||||
if stream:
|
||||
return self._stream_helper(
|
||||
self._post_json(u, None, headers=headers, stream=True))
|
||||
else:
|
||||
return self._result(
|
||||
self._post_json(u, None, headers=headers, stream=False))
|
||||
if stream:
|
||||
return self._stream_helper(
|
||||
self._post_json(u, authcfg, stream=True))
|
||||
response = self._post_json(u, None, headers=headers, stream=stream)
|
||||
else:
|
||||
return self._result(self._post_json(u, authcfg, stream=False))
|
||||
response = self._post_json(u, authcfg, stream=stream)
|
||||
|
||||
return stream and self._stream_helper(response) \
|
||||
or self._result(response)
|
||||
|
||||
def remove_container(self, container, v=False, link=False):
|
||||
if isinstance(container, dict):
|
||||
|
|
|
@ -775,11 +775,29 @@ class TestLoadConfig(BaseTestCase):
|
|||
f.write('email = sakuya@scarlet.net')
|
||||
f.close()
|
||||
cfg = docker.auth.load_config(folder)
|
||||
self.assertNotEqual(cfg['Configs'][docker.auth.INDEX_URL], None)
|
||||
cfg = cfg['Configs'][docker.auth.INDEX_URL]
|
||||
self.assertEqual(cfg['Username'], b'sakuya')
|
||||
self.assertEqual(cfg['Password'], b'izayoi')
|
||||
self.assertEqual(cfg['Email'], 'sakuya@scarlet.net')
|
||||
self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
|
||||
cfg = cfg[docker.auth.INDEX_URL]
|
||||
self.assertEqual(cfg['username'], b'sakuya')
|
||||
self.assertEqual(cfg['password'], b'izayoi')
|
||||
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
|
||||
self.assertEqual(cfg.get('Auth'), None)
|
||||
|
||||
|
||||
class TestLoadJSONConfig(BaseTestCase):
|
||||
def runTest(self):
|
||||
folder = tempfile.mkdtemp()
|
||||
f = open(os.path.join(folder, '.dockercfg'), 'w')
|
||||
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
|
||||
email_ = 'sakuya@scarlet.net'
|
||||
f.write('{{"{}": {{"auth": "{}", "email": "{}"}}}}\n'.format(
|
||||
docker.auth.INDEX_URL, auth_, email_))
|
||||
f.close()
|
||||
cfg = docker.auth.load_config(folder)
|
||||
self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
|
||||
cfg = cfg[docker.auth.INDEX_URL]
|
||||
self.assertEqual(cfg['username'], b'sakuya')
|
||||
self.assertEqual(cfg['password'], b'izayoi')
|
||||
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
|
||||
self.assertEqual(cfg.get('Auth'), None)
|
||||
|
||||
|
||||
|
|
|
@ -1018,10 +1018,6 @@ class DockerClientTest(unittest.TestCase):
|
|||
folder = tempfile.mkdtemp()
|
||||
cfg = docker.auth.load_config(folder)
|
||||
self.assertTrue(cfg is not None)
|
||||
self.assertTrue('Configs' in cfg)
|
||||
self.assertEqual(cfg['Configs'], {})
|
||||
self.assertTrue('rootPath' in cfg)
|
||||
self.assertEqual(cfg['rootPath'], folder)
|
||||
|
||||
def test_load_config(self):
|
||||
folder = tempfile.mkdtemp()
|
||||
|
@ -1031,12 +1027,13 @@ class DockerClientTest(unittest.TestCase):
|
|||
f.write('email = sakuya@scarlet.net')
|
||||
f.close()
|
||||
cfg = docker.auth.load_config(folder)
|
||||
self.assertNotEqual(cfg['Configs'][docker.auth.INDEX_URL], None)
|
||||
cfg = cfg['Configs'][docker.auth.INDEX_URL]
|
||||
self.assertEqual(cfg['Username'], 'sakuya')
|
||||
self.assertEqual(cfg['Password'], 'izayoi')
|
||||
self.assertEqual(cfg['Email'], 'sakuya@scarlet.net')
|
||||
self.assertEqual(cfg.get('Auth'), None)
|
||||
self.assertTrue(docker.auth.INDEX_URL in cfg)
|
||||
self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
|
||||
cfg = cfg[docker.auth.INDEX_URL]
|
||||
self.assertEqual(cfg['username'], 'sakuya')
|
||||
self.assertEqual(cfg['password'], 'izayoi')
|
||||
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
|
||||
self.assertEqual(cfg.get('auth'), None)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue