Merge pull request #2188 from docker/c6374-credhelpers

Modernize auth management
This commit is contained in:
Joffrey F 2018-11-30 15:32:30 -08:00 committed by GitHub
commit c344660f20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 537 additions and 293 deletions

View File

@ -288,36 +288,18 @@ class BuildApiMixin(object):
# file one more time in case anything showed up in there. # file one more time in case anything showed up in there.
if not self._auth_configs: if not self._auth_configs:
log.debug("No auth config in memory - loading from filesystem") log.debug("No auth config in memory - loading from filesystem")
self._auth_configs = auth.load_config() self._auth_configs = auth.load_config(
credsore_env=self.credsore_env
)
# Send the full auth configuration (if any exists), since the build # Send the full auth configuration (if any exists), since the build
# could use any (or all) of the registries. # could use any (or all) of the registries.
if self._auth_configs: if self._auth_configs:
auth_cfgs = self._auth_configs auth_data = self._auth_configs.get_all_credentials()
auth_data = {}
if auth_cfgs.get('credsStore'):
# Using a credentials store, we need to retrieve the
# credentials for each registry listed in the config.json file
# Matches CLI behavior: https://github.com/docker/docker/blob/
# 67b85f9d26f1b0b2b240f2d794748fac0f45243c/cliconfig/
# credentials/native_store.go#L68-L83
for registry in auth_cfgs.get('auths', {}).keys():
auth_data[registry] = auth.resolve_authconfig(
auth_cfgs, registry,
credstore_env=self.credstore_env,
)
else:
for registry in auth_cfgs.get('credHelpers', {}).keys():
auth_data[registry] = auth.resolve_authconfig(
auth_cfgs, registry,
credstore_env=self.credstore_env
)
for registry, creds in auth_cfgs.get('auths', {}).items():
if registry not in auth_data:
auth_data[registry] = creds
# See https://github.com/docker/docker-py/issues/1683 # See https://github.com/docker/docker-py/issues/1683
if auth.INDEX_NAME in auth_data: if auth.INDEX_URL not in auth_data and auth.INDEX_URL in auth_data:
auth_data[auth.INDEX_URL] = auth_data[auth.INDEX_NAME] auth_data[auth.INDEX_URL] = auth_data.get(auth.INDEX_NAME, {})
log.debug( log.debug(
'Sending auth config ({0})'.format( 'Sending auth config ({0})'.format(
@ -325,6 +307,7 @@ class BuildApiMixin(object):
) )
) )
if auth_data:
headers['X-Registry-Config'] = auth.encode_header( headers['X-Registry-Config'] = auth.encode_header(
auth_data auth_data
) )

View File

@ -115,7 +115,7 @@ class APIClient(
self._general_configs = config.load_general_config() self._general_configs = config.load_general_config()
self._auth_configs = auth.load_config( self._auth_configs = auth.load_config(
config_dict=self._general_configs config_dict=self._general_configs, credstore_env=credstore_env,
) )
self.credstore_env = credstore_env self.credstore_env = credstore_env
@ -480,4 +480,6 @@ class APIClient(
Returns: Returns:
None None
""" """
self._auth_configs = auth.load_config(dockercfg_path) self._auth_configs = auth.load_config(
dockercfg_path, credstore_env=self.credstore_env
)

View File

@ -124,13 +124,15 @@ class DaemonApiMixin(object):
# If dockercfg_path is passed check to see if the config file exists, # If dockercfg_path is passed check to see if the config file exists,
# if so load that config. # if so load that config.
if dockercfg_path and os.path.exists(dockercfg_path): if dockercfg_path and os.path.exists(dockercfg_path):
self._auth_configs = auth.load_config(dockercfg_path) self._auth_configs = auth.load_config(
elif not self._auth_configs: dockercfg_path, credstore_env=self.credstore_env
self._auth_configs = auth.load_config()
authcfg = auth.resolve_authconfig(
self._auth_configs, registry, credstore_env=self.credstore_env,
) )
elif not self._auth_configs:
self._auth_configs = auth.load_config(
credstore_env=self.credstore_env
)
authcfg = self._auth_configs.resolve_authconfig(registry)
# If we found an existing auth config for this registry and username # If we found an existing auth config for this registry and username
# combination, we can return it immediately unless reauth is requested. # combination, we can return it immediately unless reauth is requested.
if authcfg and authcfg.get('username', None) == username \ if authcfg and authcfg.get('username', None) == username \
@ -146,9 +148,7 @@ class DaemonApiMixin(object):
response = self._post_json(self._url('/auth'), data=req_data) response = self._post_json(self._url('/auth'), data=req_data)
if response.status_code == 200: if response.status_code == 200:
if 'auths' not in self._auth_configs: self._auth_configs.add_auth(registry or auth.INDEX_NAME, req_data)
self._auth_configs['auths'] = {}
self._auth_configs['auths'][registry or auth.INDEX_NAME] = req_data
return self._result(response, json=True) return self._result(response, json=True)
def ping(self): def ping(self):

View File

@ -43,7 +43,7 @@ def get_config_header(client, registry):
log.debug( log.debug(
"No auth config in memory - loading from filesystem" "No auth config in memory - loading from filesystem"
) )
client._auth_configs = load_config() client._auth_configs = load_config(credstore_env=client.credstore_env)
authcfg = resolve_authconfig( authcfg = resolve_authconfig(
client._auth_configs, registry, credstore_env=client.credstore_env client._auth_configs, registry, credstore_env=client.credstore_env
) )
@ -70,31 +70,163 @@ def split_repo_name(repo_name):
def get_credential_store(authconfig, registry): def get_credential_store(authconfig, registry):
if not registry or registry == INDEX_NAME: if not isinstance(authconfig, AuthConfig):
registry = 'https://index.docker.io/v1/' authconfig = AuthConfig(authconfig)
return authconfig.get_credential_store(registry)
return authconfig.get('credHelpers', {}).get(registry) or authconfig.get(
'credsStore' class AuthConfig(dict):
def __init__(self, dct, credstore_env=None):
if 'auths' not in dct:
dct['auths'] = {}
self.update(dct)
self._credstore_env = credstore_env
self._stores = {}
@classmethod
def parse_auth(cls, entries, raise_on_error=False):
"""
Parses authentication entries
Args:
entries: Dict of authentication entries.
raise_on_error: If set to true, an invalid format will raise
InvalidConfigFile
Returns:
Authentication registry.
"""
conf = {}
for registry, entry in six.iteritems(entries):
if not isinstance(entry, dict):
log.debug(
'Config entry for key {0} is not auth config'.format(
registry
)
)
# We sometimes fall back to parsing the whole config as if it
# was the auth config by itself, for legacy purposes. In that
# case, we fail silently and return an empty conf if any of the
# keys is not formatted properly.
if raise_on_error:
raise errors.InvalidConfigFile(
'Invalid configuration for registry {0}'.format(
registry
)
)
return {}
if 'identitytoken' in entry:
log.debug(
'Found an IdentityToken entry for registry {0}'.format(
registry
)
)
conf[registry] = {
'IdentityToken': entry['identitytoken']
}
continue # Other values are irrelevant if we have a token
if 'auth' not in entry:
# Starting with engine v1.11 (API 1.23), an empty dictionary is
# a valid value in the auths config.
# https://github.com/docker/compose/issues/3265
log.debug(
'Auth data for {0} is absent. Client might be using a '
'credentials store instead.'.format(registry)
)
conf[registry] = {}
continue
username, password = decode_auth(entry['auth'])
log.debug(
'Found entry (registry={0}, username={1})'
.format(repr(registry), repr(username))
) )
conf[registry] = {
'username': username,
'password': password,
'email': entry.get('email'),
'serveraddress': registry,
}
return conf
def resolve_authconfig(authconfig, registry=None, credstore_env=None): @classmethod
def load_config(cls, config_path, config_dict, credstore_env=None):
"""
Loads authentication data from a Docker configuration file in the given
root directory or if config_path is passed use given path.
Lookup priority:
explicit config_path parameter > DOCKER_CONFIG environment
variable > ~/.docker/config.json > ~/.dockercfg
"""
if not config_dict:
config_file = config.find_config_file(config_path)
if not config_file:
return cls({}, credstore_env)
try:
with open(config_file) as f:
config_dict = json.load(f)
except (IOError, KeyError, ValueError) as e:
# Likely missing new Docker config file or it's in an
# unknown format, continue to attempt to read old location
# and format.
log.debug(e)
return cls(_load_legacy_config(config_file), credstore_env)
res = {}
if config_dict.get('auths'):
log.debug("Found 'auths' section")
res.update({
'auths': cls.parse_auth(
config_dict.pop('auths'), raise_on_error=True
)
})
if config_dict.get('credsStore'):
log.debug("Found 'credsStore' section")
res.update({'credsStore': config_dict.pop('credsStore')})
if config_dict.get('credHelpers'):
log.debug("Found 'credHelpers' section")
res.update({'credHelpers': config_dict.pop('credHelpers')})
if res:
return cls(res, credstore_env)
log.debug(
"Couldn't find auth-related section ; attempting to interpret "
"as auth-only file"
)
return cls({'auths': cls.parse_auth(config_dict)}, credstore_env)
@property
def auths(self):
return self.get('auths', {})
@property
def creds_store(self):
return self.get('credsStore', None)
@property
def cred_helpers(self):
return self.get('credHelpers', {})
def resolve_authconfig(self, registry=None):
""" """
Returns the authentication data from the given auth configuration for a Returns the authentication data from the given auth configuration for a
specific registry. As with the Docker client, legacy entries in the config specific registry. As with the Docker client, legacy entries in the
with full URLs are stripped down to hostnames before checking for a match. config with full URLs are stripped down to hostnames before checking
Returns None if no match was found. for a match. Returns None if no match was found.
""" """
if 'credHelpers' in authconfig or 'credsStore' in authconfig: if self.creds_store or self.cred_helpers:
store_name = get_credential_store(authconfig, registry) store_name = self.get_credential_store(registry)
if store_name is not None: if store_name is not None:
log.debug( log.debug(
'Using credentials store "{0}"'.format(store_name) 'Using credentials store "{0}"'.format(store_name)
) )
cfg = _resolve_authconfig_credstore( cfg = self._resolve_authconfig_credstore(registry, store_name)
authconfig, registry, store_name, env=credstore_env
)
if cfg is not None: if cfg is not None:
return cfg return cfg
log.debug('No entry in credstore - fetching from auth dict') log.debug('No entry in credstore - fetching from auth dict')
@ -103,12 +235,11 @@ def resolve_authconfig(authconfig, registry=None, credstore_env=None):
registry = resolve_index_name(registry) if registry else INDEX_NAME registry = resolve_index_name(registry) if registry else INDEX_NAME
log.debug("Looking for auth entry for {0}".format(repr(registry))) log.debug("Looking for auth entry for {0}".format(repr(registry)))
authdict = authconfig.get('auths', {}) if registry in self.auths:
if registry in authdict:
log.debug("Found {0}".format(repr(registry))) log.debug("Found {0}".format(repr(registry)))
return authdict[registry] return self.auths[registry]
for key, conf in six.iteritems(authdict): for key, conf in six.iteritems(self.auths):
if resolve_index_name(key) == registry: if resolve_index_name(key) == registry:
log.debug("Found {0}".format(repr(key))) log.debug("Found {0}".format(repr(key)))
return conf return conf
@ -116,15 +247,13 @@ def resolve_authconfig(authconfig, registry=None, credstore_env=None):
log.debug("No entry found") log.debug("No entry found")
return None return None
def _resolve_authconfig_credstore(self, registry, credstore_name):
def _resolve_authconfig_credstore(authconfig, registry, credstore_name,
env=None):
if not registry or registry == INDEX_NAME: if not registry or registry == INDEX_NAME:
# The ecosystem is a little schizophrenic with index.docker.io VS # The ecosystem is a little schizophrenic with index.docker.io VS
# docker.io - in that case, it seems the full URL is necessary. # docker.io - in that case, it seems the full URL is necessary.
registry = INDEX_URL registry = INDEX_URL
log.debug("Looking for auth entry for {0}".format(repr(registry))) log.debug("Looking for auth entry for {0}".format(repr(registry)))
store = dockerpycreds.Store(credstore_name, environment=env) store = self._get_store_instance(credstore_name)
try: try:
data = store.get(registry) data = store.get(registry)
res = { res = {
@ -146,6 +275,46 @@ def _resolve_authconfig_credstore(authconfig, registry, credstore_name,
'Credentials store error: {0}'.format(repr(e)) 'Credentials store error: {0}'.format(repr(e))
) )
def _get_store_instance(self, name):
if name not in self._stores:
self._stores[name] = dockerpycreds.Store(
name, environment=self._credstore_env
)
return self._stores[name]
def get_credential_store(self, registry):
if not registry or registry == INDEX_NAME:
registry = INDEX_URL
return self.cred_helpers.get(registry) or self.creds_store
def get_all_credentials(self):
auth_data = self.auths.copy()
if self.creds_store:
# Retrieve all credentials from the default store
store = self._get_store_instance(self.creds_store)
for k in store.list().keys():
auth_data[k] = self._resolve_authconfig_credstore(
k, self.creds_store
)
# credHelpers entries take priority over all others
for reg, store_name in self.cred_helpers.items():
auth_data[reg] = self._resolve_authconfig_credstore(
reg, store_name
)
return auth_data
def add_auth(self, reg, data):
self['auths'][reg] = data
def resolve_authconfig(authconfig, registry=None, credstore_env=None):
if not isinstance(authconfig, AuthConfig):
authconfig = AuthConfig(authconfig, credstore_env)
return authconfig.resolve_authconfig(registry)
def convert_to_hostname(url): def convert_to_hostname(url):
return url.replace('http://', '').replace('https://', '').split('/', 1)[0] return url.replace('http://', '').replace('https://', '').split('/', 1)[0]
@ -177,100 +346,11 @@ def parse_auth(entries, raise_on_error=False):
Authentication registry. Authentication registry.
""" """
conf = {} return AuthConfig.parse_auth(entries, raise_on_error)
for registry, entry in six.iteritems(entries):
if not isinstance(entry, dict):
log.debug(
'Config entry for key {0} is not auth config'.format(registry)
)
# We sometimes fall back to parsing the whole config as if it was
# the auth config by itself, for legacy purposes. In that case, we
# fail silently and return an empty conf if any of the keys is not
# formatted properly.
if raise_on_error:
raise errors.InvalidConfigFile(
'Invalid configuration for registry {0}'.format(registry)
)
return {}
if 'identitytoken' in entry:
log.debug('Found an IdentityToken entry for registry {0}'.format(
registry
))
conf[registry] = {
'IdentityToken': entry['identitytoken']
}
continue # Other values are irrelevant if we have a token, skip.
if 'auth' not in entry:
# Starting with engine v1.11 (API 1.23), an empty dictionary is
# a valid value in the auths config.
# https://github.com/docker/compose/issues/3265
log.debug(
'Auth data for {0} is absent. Client might be using a '
'credentials store instead.'.format(registry)
)
conf[registry] = {}
continue
username, password = decode_auth(entry['auth'])
log.debug(
'Found entry (registry={0}, username={1})'
.format(repr(registry), repr(username))
)
conf[registry] = {
'username': username,
'password': password,
'email': entry.get('email'),
'serveraddress': registry,
}
return conf
def load_config(config_path=None, config_dict=None): def load_config(config_path=None, config_dict=None, credstore_env=None):
""" return AuthConfig.load_config(config_path, config_dict, credstore_env)
Loads authentication data from a Docker configuration file in the given
root directory or if config_path is passed use given path.
Lookup priority:
explicit config_path parameter > DOCKER_CONFIG environment variable >
~/.docker/config.json > ~/.dockercfg
"""
if not config_dict:
config_file = config.find_config_file(config_path)
if not config_file:
return {}
try:
with open(config_file) as f:
config_dict = json.load(f)
except (IOError, KeyError, ValueError) as e:
# Likely missing new Docker config file or it's in an
# unknown format, continue to attempt to read old location
# and format.
log.debug(e)
return _load_legacy_config(config_file)
res = {}
if config_dict.get('auths'):
log.debug("Found 'auths' section")
res.update({
'auths': parse_auth(config_dict.pop('auths'), raise_on_error=True)
})
if config_dict.get('credsStore'):
log.debug("Found 'credsStore' section")
res.update({'credsStore': config_dict.pop('credsStore')})
if config_dict.get('credHelpers'):
log.debug("Found 'credHelpers' section")
res.update({'credHelpers': config_dict.pop('credHelpers')})
if res:
return res
log.debug(
"Couldn't find auth-related section ; attempting to interpret "
"as auth-only file"
)
return {'auths': parse_auth(config_dict)}
def _load_legacy_config(config_file): def _load_legacy_config(config_file):

View File

@ -4,7 +4,7 @@ backports.ssl-match-hostname==3.5.0.1
cffi==1.10.0 cffi==1.10.0
cryptography==1.9; python_version == '3.3' cryptography==1.9; python_version == '3.3'
cryptography==2.3; python_version > '3.3' cryptography==2.3; python_version > '3.3'
docker-pycreds==0.3.0 docker-pycreds==0.4.0
enum34==1.1.6 enum34==1.1.6
idna==2.5 idna==2.5
ipaddress==1.0.18 ipaddress==1.0.18

View File

@ -12,7 +12,7 @@ SOURCE_DIR = os.path.join(ROOT_DIR)
requirements = [ requirements = [
'six >= 1.4.0', 'six >= 1.4.0',
'websocket-client >= 0.32.0', 'websocket-client >= 0.32.0',
'docker-pycreds >= 0.3.0', 'docker-pycreds >= 0.4.0',
'requests >= 2.14.2, != 2.18.0', 'requests >= 2.14.2, != 2.18.0',
] ]

View File

@ -65,7 +65,7 @@ class BuildTest(BaseAPIClientTest):
) )
def test_build_remote_with_registry_auth(self): def test_build_remote_with_registry_auth(self):
self.client._auth_configs = { self.client._auth_configs = auth.AuthConfig({
'auths': { 'auths': {
'https://example.com': { 'https://example.com': {
'user': 'example', 'user': 'example',
@ -73,7 +73,7 @@ class BuildTest(BaseAPIClientTest):
'email': 'example@example.com' 'email': 'example@example.com'
} }
} }
} })
expected_params = {'t': None, 'q': False, 'dockerfile': None, expected_params = {'t': None, 'q': False, 'dockerfile': None,
'rm': False, 'nocache': False, 'pull': False, 'rm': False, 'nocache': False, 'pull': False,
@ -81,7 +81,7 @@ class BuildTest(BaseAPIClientTest):
'remote': 'https://github.com/docker-library/mongo'} 'remote': 'https://github.com/docker-library/mongo'}
expected_headers = { expected_headers = {
'X-Registry-Config': auth.encode_header( 'X-Registry-Config': auth.encode_header(
self.client._auth_configs['auths'] self.client._auth_configs.auths
) )
} }
@ -115,7 +115,7 @@ class BuildTest(BaseAPIClientTest):
}) })
def test_set_auth_headers_with_empty_dict_and_auth_configs(self): def test_set_auth_headers_with_empty_dict_and_auth_configs(self):
self.client._auth_configs = { self.client._auth_configs = auth.AuthConfig({
'auths': { 'auths': {
'https://example.com': { 'https://example.com': {
'user': 'example', 'user': 'example',
@ -123,12 +123,12 @@ class BuildTest(BaseAPIClientTest):
'email': 'example@example.com' 'email': 'example@example.com'
} }
} }
} })
headers = {} headers = {}
expected_headers = { expected_headers = {
'X-Registry-Config': auth.encode_header( 'X-Registry-Config': auth.encode_header(
self.client._auth_configs['auths'] self.client._auth_configs.auths
) )
} }
@ -136,7 +136,7 @@ class BuildTest(BaseAPIClientTest):
assert headers == expected_headers assert headers == expected_headers
def test_set_auth_headers_with_dict_and_auth_configs(self): def test_set_auth_headers_with_dict_and_auth_configs(self):
self.client._auth_configs = { self.client._auth_configs = auth.AuthConfig({
'auths': { 'auths': {
'https://example.com': { 'https://example.com': {
'user': 'example', 'user': 'example',
@ -144,12 +144,12 @@ class BuildTest(BaseAPIClientTest):
'email': 'example@example.com' 'email': 'example@example.com'
} }
} }
} })
headers = {'foo': 'bar'} headers = {'foo': 'bar'}
expected_headers = { expected_headers = {
'X-Registry-Config': auth.encode_header( 'X-Registry-Config': auth.encode_header(
self.client._auth_configs['auths'] self.client._auth_configs.auths
), ),
'foo': 'bar' 'foo': 'bar'
} }

View File

@ -222,14 +222,12 @@ class DockerApiTest(BaseAPIClientTest):
'username': 'sakuya', 'password': 'izayoi' 'username': 'sakuya', 'password': 'izayoi'
} }
assert args[1]['headers'] == {'Content-Type': 'application/json'} assert args[1]['headers'] == {'Content-Type': 'application/json'}
assert self.client._auth_configs['auths'] == { assert self.client._auth_configs.auths['docker.io'] == {
'docker.io': {
'email': None, 'email': None,
'password': 'izayoi', 'password': 'izayoi',
'username': 'sakuya', 'username': 'sakuya',
'serveraddress': None, 'serveraddress': None,
} }
}
def test_events(self): def test_events(self):
self.client.events() self.client.events()

View File

@ -10,6 +10,7 @@ import tempfile
import unittest import unittest
from docker import auth, errors from docker import auth, errors
import dockerpycreds
import pytest import pytest
try: try:
@ -106,13 +107,13 @@ class ResolveAuthTest(unittest.TestCase):
private_config = {'auth': encode_auth({'username': 'privateuser'})} private_config = {'auth': encode_auth({'username': 'privateuser'})}
legacy_config = {'auth': encode_auth({'username': 'legacyauth'})} legacy_config = {'auth': encode_auth({'username': 'legacyauth'})}
auth_config = { auth_config = auth.AuthConfig({
'auths': auth.parse_auth({ 'auths': auth.parse_auth({
'https://index.docker.io/v1/': index_config, 'https://index.docker.io/v1/': index_config,
'my.registry.net': private_config, 'my.registry.net': private_config,
'http://legacy.registry.url/v1/': legacy_config, 'http://legacy.registry.url/v1/': legacy_config,
}) })
} })
def test_resolve_authconfig_hostname_only(self): def test_resolve_authconfig_hostname_only(self):
assert auth.resolve_authconfig( assert auth.resolve_authconfig(
@ -211,70 +212,21 @@ class ResolveAuthTest(unittest.TestCase):
) is None ) is None
def test_resolve_auth_with_empty_credstore_and_auth_dict(self): def test_resolve_auth_with_empty_credstore_and_auth_dict(self):
auth_config = { auth_config = auth.AuthConfig({
'auths': auth.parse_auth({ 'auths': auth.parse_auth({
'https://index.docker.io/v1/': self.index_config, 'https://index.docker.io/v1/': self.index_config,
}), }),
'credsStore': 'blackbox' 'credsStore': 'blackbox'
} })
with mock.patch('docker.auth._resolve_authconfig_credstore') as m: with mock.patch(
'docker.auth.AuthConfig._resolve_authconfig_credstore'
) as m:
m.return_value = None m.return_value = None
assert 'indexuser' == auth.resolve_authconfig( assert 'indexuser' == auth.resolve_authconfig(
auth_config, None auth_config, None
)['username'] )['username']
class CredStoreTest(unittest.TestCase):
def test_get_credential_store(self):
auth_config = {
'credHelpers': {
'registry1.io': 'truesecret',
'registry2.io': 'powerlock'
},
'credsStore': 'blackbox',
}
assert auth.get_credential_store(
auth_config, 'registry1.io'
) == 'truesecret'
assert auth.get_credential_store(
auth_config, 'registry2.io'
) == 'powerlock'
assert auth.get_credential_store(
auth_config, 'registry3.io'
) == 'blackbox'
def test_get_credential_store_no_default(self):
auth_config = {
'credHelpers': {
'registry1.io': 'truesecret',
'registry2.io': 'powerlock'
},
}
assert auth.get_credential_store(
auth_config, 'registry2.io'
) == 'powerlock'
assert auth.get_credential_store(
auth_config, 'registry3.io'
) is None
def test_get_credential_store_default_index(self):
auth_config = {
'credHelpers': {
'https://index.docker.io/v1/': 'powerlock'
},
'credsStore': 'truesecret'
}
assert auth.get_credential_store(auth_config, None) == 'powerlock'
assert auth.get_credential_store(
auth_config, 'docker.io'
) == 'powerlock'
assert auth.get_credential_store(
auth_config, 'images.io'
) == 'truesecret'
class LoadConfigTest(unittest.TestCase): class LoadConfigTest(unittest.TestCase):
def test_load_config_no_file(self): def test_load_config_no_file(self):
folder = tempfile.mkdtemp() folder = tempfile.mkdtemp()
@ -293,8 +245,8 @@ class LoadConfigTest(unittest.TestCase):
cfg = auth.load_config(cfg_path) cfg = auth.load_config(cfg_path)
assert auth.resolve_authconfig(cfg) is not None assert auth.resolve_authconfig(cfg) is not None
assert cfg['auths'][auth.INDEX_NAME] is not None assert cfg.auths[auth.INDEX_NAME] is not None
cfg = cfg['auths'][auth.INDEX_NAME] cfg = cfg.auths[auth.INDEX_NAME]
assert cfg['username'] == 'sakuya' assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi' assert cfg['password'] == 'izayoi'
assert cfg['email'] == 'sakuya@scarlet.net' assert cfg['email'] == 'sakuya@scarlet.net'
@ -312,8 +264,8 @@ class LoadConfigTest(unittest.TestCase):
) )
cfg = auth.load_config(cfg_path) cfg = auth.load_config(cfg_path)
assert auth.resolve_authconfig(cfg) is not None assert auth.resolve_authconfig(cfg) is not None
assert cfg['auths'][auth.INDEX_URL] is not None assert cfg.auths[auth.INDEX_URL] is not None
cfg = cfg['auths'][auth.INDEX_URL] cfg = cfg.auths[auth.INDEX_URL]
assert cfg['username'] == 'sakuya' assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi' assert cfg['password'] == 'izayoi'
assert cfg['email'] == email assert cfg['email'] == email
@ -335,8 +287,8 @@ class LoadConfigTest(unittest.TestCase):
}, f) }, f)
cfg = auth.load_config(cfg_path) cfg = auth.load_config(cfg_path)
assert auth.resolve_authconfig(cfg) is not None assert auth.resolve_authconfig(cfg) is not None
assert cfg['auths'][auth.INDEX_URL] is not None assert cfg.auths[auth.INDEX_URL] is not None
cfg = cfg['auths'][auth.INDEX_URL] cfg = cfg.auths[auth.INDEX_URL]
assert cfg['username'] == 'sakuya' assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi' assert cfg['password'] == 'izayoi'
assert cfg['email'] == email assert cfg['email'] == email
@ -360,7 +312,7 @@ class LoadConfigTest(unittest.TestCase):
with open(dockercfg_path, 'w') as f: with open(dockercfg_path, 'w') as f:
json.dump(config, f) json.dump(config, f)
cfg = auth.load_config(dockercfg_path)['auths'] cfg = auth.load_config(dockercfg_path).auths
assert registry in cfg assert registry in cfg
assert cfg[registry] is not None assert cfg[registry] is not None
cfg = cfg[registry] cfg = cfg[registry]
@ -387,7 +339,7 @@ class LoadConfigTest(unittest.TestCase):
json.dump(config, f) json.dump(config, f)
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None)['auths'] cfg = auth.load_config(None).auths
assert registry in cfg assert registry in cfg
assert cfg[registry] is not None assert cfg[registry] is not None
cfg = cfg[registry] cfg = cfg[registry]
@ -417,8 +369,8 @@ class LoadConfigTest(unittest.TestCase):
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None) cfg = auth.load_config(None)
assert registry in cfg['auths'] assert registry in cfg.auths
cfg = cfg['auths'][registry] cfg = cfg.auths[registry]
assert cfg['username'] == 'sakuya' assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi' assert cfg['password'] == 'izayoi'
assert cfg['email'] == 'sakuya@scarlet.net' assert cfg['email'] == 'sakuya@scarlet.net'
@ -446,8 +398,8 @@ class LoadConfigTest(unittest.TestCase):
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None) cfg = auth.load_config(None)
assert registry in cfg['auths'] assert registry in cfg.auths
cfg = cfg['auths'][registry] cfg = cfg.auths[registry]
assert cfg['username'] == b'sakuya\xc3\xa6'.decode('utf8') assert cfg['username'] == b'sakuya\xc3\xa6'.decode('utf8')
assert cfg['password'] == b'izayoi\xc3\xa6'.decode('utf8') assert cfg['password'] == b'izayoi\xc3\xa6'.decode('utf8')
assert cfg['email'] == 'sakuya@scarlet.net' assert cfg['email'] == 'sakuya@scarlet.net'
@ -464,7 +416,7 @@ class LoadConfigTest(unittest.TestCase):
json.dump(config, f) json.dump(config, f)
cfg = auth.load_config(dockercfg_path) cfg = auth.load_config(dockercfg_path)
assert cfg == {'auths': {}} assert dict(cfg) == {'auths': {}}
def test_load_config_invalid_auth_dict(self): def test_load_config_invalid_auth_dict(self):
folder = tempfile.mkdtemp() folder = tempfile.mkdtemp()
@ -479,7 +431,7 @@ class LoadConfigTest(unittest.TestCase):
json.dump(config, f) json.dump(config, f)
cfg = auth.load_config(dockercfg_path) cfg = auth.load_config(dockercfg_path)
assert cfg == {'auths': {'scarlet.net': {}}} assert dict(cfg) == {'auths': {'scarlet.net': {}}}
def test_load_config_identity_token(self): def test_load_config_identity_token(self):
folder = tempfile.mkdtemp() folder = tempfile.mkdtemp()
@ -500,7 +452,236 @@ class LoadConfigTest(unittest.TestCase):
json.dump(config, f) json.dump(config, f)
cfg = auth.load_config(dockercfg_path) cfg = auth.load_config(dockercfg_path)
assert registry in cfg['auths'] assert registry in cfg.auths
cfg = cfg['auths'][registry] cfg = cfg.auths[registry]
assert 'IdentityToken' in cfg assert 'IdentityToken' in cfg
assert cfg['IdentityToken'] == token assert cfg['IdentityToken'] == token
class CredstoreTest(unittest.TestCase):
def setUp(self):
self.authconfig = auth.AuthConfig({'credsStore': 'default'})
self.default_store = InMemoryStore('default')
self.authconfig._stores['default'] = self.default_store
self.default_store.store(
'https://gensokyo.jp/v2', 'sakuya', 'izayoi',
)
self.default_store.store(
'https://default.com/v2', 'user', 'hunter2',
)
def test_get_credential_store(self):
auth_config = auth.AuthConfig({
'credHelpers': {
'registry1.io': 'truesecret',
'registry2.io': 'powerlock'
},
'credsStore': 'blackbox',
})
assert auth_config.get_credential_store('registry1.io') == 'truesecret'
assert auth_config.get_credential_store('registry2.io') == 'powerlock'
assert auth_config.get_credential_store('registry3.io') == 'blackbox'
def test_get_credential_store_no_default(self):
auth_config = auth.AuthConfig({
'credHelpers': {
'registry1.io': 'truesecret',
'registry2.io': 'powerlock'
},
})
assert auth_config.get_credential_store('registry2.io') == 'powerlock'
assert auth_config.get_credential_store('registry3.io') is None
def test_get_credential_store_default_index(self):
auth_config = auth.AuthConfig({
'credHelpers': {
'https://index.docker.io/v1/': 'powerlock'
},
'credsStore': 'truesecret'
})
assert auth_config.get_credential_store(None) == 'powerlock'
assert auth_config.get_credential_store('docker.io') == 'powerlock'
assert auth_config.get_credential_store('images.io') == 'truesecret'
def test_get_credential_store_with_plain_dict(self):
auth_config = {
'credHelpers': {
'registry1.io': 'truesecret',
'registry2.io': 'powerlock'
},
'credsStore': 'blackbox',
}
assert auth.get_credential_store(
auth_config, 'registry1.io'
) == 'truesecret'
assert auth.get_credential_store(
auth_config, 'registry2.io'
) == 'powerlock'
assert auth.get_credential_store(
auth_config, 'registry3.io'
) == 'blackbox'
def test_get_all_credentials_credstore_only(self):
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
}
def test_get_all_credentials_with_empty_credhelper(self):
self.authconfig['credHelpers'] = {
'registry1.io': 'truesecret',
}
self.authconfig._stores['truesecret'] = InMemoryStore()
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'registry1.io': None,
}
def test_get_all_credentials_with_credhelpers_only(self):
del self.authconfig['credsStore']
assert self.authconfig.get_all_credentials() == {}
self.authconfig['credHelpers'] = {
'https://gensokyo.jp/v2': 'default',
'https://default.com/v2': 'default',
}
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
}
def test_get_all_credentials_with_auths_entries(self):
self.authconfig.add_auth('registry1.io', {
'ServerAddress': 'registry1.io',
'Username': 'reimu',
'Password': 'hakurei',
})
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'registry1.io': {
'ServerAddress': 'registry1.io',
'Username': 'reimu',
'Password': 'hakurei',
},
}
def test_get_all_credentials_helpers_override_default(self):
self.authconfig['credHelpers'] = {
'https://default.com/v2': 'truesecret',
}
truesecret = InMemoryStore('truesecret')
truesecret.store('https://default.com/v2', 'reimu', 'hakurei')
self.authconfig._stores['truesecret'] = truesecret
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'reimu',
'Password': 'hakurei',
'ServerAddress': 'https://default.com/v2',
},
}
def test_get_all_credentials_3_sources(self):
self.authconfig['credHelpers'] = {
'registry1.io': 'truesecret',
}
truesecret = InMemoryStore('truesecret')
truesecret.store('registry1.io', 'reimu', 'hakurei')
self.authconfig._stores['truesecret'] = truesecret
self.authconfig.add_auth('registry2.io', {
'ServerAddress': 'registry2.io',
'Username': 'reimu',
'Password': 'hakurei',
})
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'registry1.io': {
'ServerAddress': 'registry1.io',
'Username': 'reimu',
'Password': 'hakurei',
},
'registry2.io': {
'ServerAddress': 'registry2.io',
'Username': 'reimu',
'Password': 'hakurei',
}
}
class InMemoryStore(dockerpycreds.Store):
def __init__(self, *args, **kwargs):
self.__store = {}
def get(self, server):
try:
return self.__store[server]
except KeyError:
raise dockerpycreds.errors.CredentialsNotFound()
def store(self, server, username, secret):
self.__store[server] = {
'ServerURL': server,
'Username': username,
'Secret': secret,
}
def list(self):
return dict(
[(k, v['Username']) for k, v in self.__store.items()]
)
def erase(self, server):
del self.__store[server]