mirror of https://github.com/docker/docker-py.git
108 lines
3.7 KiB
Python
108 lines
3.7 KiB
Python
import errno
|
|
import json
|
|
import subprocess
|
|
|
|
import six
|
|
|
|
from . import constants
|
|
from . import errors
|
|
from .utils import create_environment_dict
|
|
from .utils import find_executable
|
|
|
|
|
|
class Store(object):
|
|
def __init__(self, program, environment=None):
|
|
""" Create a store object that acts as an interface to
|
|
perform the basic operations for storing, retrieving
|
|
and erasing credentials using `program`.
|
|
"""
|
|
self.program = constants.PROGRAM_PREFIX + program
|
|
self.exe = find_executable(self.program)
|
|
self.environment = environment
|
|
if self.exe is None:
|
|
raise errors.InitializationError(
|
|
'{} not installed or not available in PATH'.format(
|
|
self.program
|
|
)
|
|
)
|
|
|
|
def get(self, server):
|
|
""" Retrieve credentials for `server`. If no credentials are found,
|
|
a `StoreError` will be raised.
|
|
"""
|
|
if not isinstance(server, six.binary_type):
|
|
server = server.encode('utf-8')
|
|
data = self._execute('get', server)
|
|
result = json.loads(data.decode('utf-8'))
|
|
|
|
# docker-credential-pass will return an object for inexistent servers
|
|
# whereas other helpers will exit with returncode != 0. For
|
|
# consistency, if no significant data is returned,
|
|
# raise CredentialsNotFound
|
|
if result['Username'] == '' and result['Secret'] == '':
|
|
raise errors.CredentialsNotFound(
|
|
'No matching credentials in {}'.format(self.program)
|
|
)
|
|
|
|
return result
|
|
|
|
def store(self, server, username, secret):
|
|
""" Store credentials for `server`. Raises a `StoreError` if an error
|
|
occurs.
|
|
"""
|
|
data_input = json.dumps({
|
|
'ServerURL': server,
|
|
'Username': username,
|
|
'Secret': secret
|
|
}).encode('utf-8')
|
|
return self._execute('store', data_input)
|
|
|
|
def erase(self, server):
|
|
""" Erase credentials for `server`. Raises a `StoreError` if an error
|
|
occurs.
|
|
"""
|
|
if not isinstance(server, six.binary_type):
|
|
server = server.encode('utf-8')
|
|
self._execute('erase', server)
|
|
|
|
def list(self):
|
|
""" List stored credentials. Requires v0.4.0+ of the helper.
|
|
"""
|
|
data = self._execute('list', None)
|
|
return json.loads(data.decode('utf-8'))
|
|
|
|
def _execute(self, subcmd, data_input):
|
|
output = None
|
|
env = create_environment_dict(self.environment)
|
|
try:
|
|
if six.PY3:
|
|
output = subprocess.check_output(
|
|
[self.exe, subcmd], input=data_input, env=env,
|
|
)
|
|
else:
|
|
process = subprocess.Popen(
|
|
[self.exe, subcmd], stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE, env=env,
|
|
)
|
|
output, _ = process.communicate(data_input)
|
|
if process.returncode != 0:
|
|
raise subprocess.CalledProcessError(
|
|
returncode=process.returncode, cmd='', output=output
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
raise errors.process_store_error(e, self.program)
|
|
except OSError as e:
|
|
if e.errno == errno.ENOENT:
|
|
raise errors.StoreError(
|
|
'{} not installed or not available in PATH'.format(
|
|
self.program
|
|
)
|
|
)
|
|
else:
|
|
raise errors.StoreError(
|
|
'Unexpected OS error "{}", errno={}'.format(
|
|
e.strerror, e.errno
|
|
)
|
|
)
|
|
return output
|