diff --git a/docker/api/__init__.py b/docker/api/__init__.py new file mode 100644 index 00000000..836f07e3 --- /dev/null +++ b/docker/api/__init__.py @@ -0,0 +1,6 @@ +# flake8: noqa +from .build import BuildApiMixin +from .container import ContainerApiMixin +from .daemon import DaemonApiMixin +from .exec_api import ExecApiMixin +from .image import ImageApiMixin diff --git a/docker/api/build.py b/docker/api/build.py new file mode 100644 index 00000000..ce6fd465 --- /dev/null +++ b/docker/api/build.py @@ -0,0 +1,132 @@ +import logging +import os +import re + +from .. import constants +from .. import errors +from ..auth import auth +from ..utils import utils + + +log = logging.getLogger(__name__) + + +class BuildApiMixin(object): + def build(self, path=None, tag=None, quiet=False, fileobj=None, + nocache=False, rm=False, stream=False, timeout=None, + custom_context=False, encoding=None, pull=False, + forcerm=False, dockerfile=None, container_limits=None, + decode=False): + remote = context = headers = None + container_limits = container_limits or {} + if path is None and fileobj is None: + raise TypeError("Either path or fileobj needs to be provided.") + + for key in container_limits.keys(): + if key not in constants.CONTAINER_LIMITS_KEYS: + raise errors.DockerException( + 'Invalid container_limits key {0}'.format(key) + ) + + if custom_context: + if not fileobj: + raise TypeError("You must specify fileobj with custom_context") + context = fileobj + elif fileobj is not None: + context = utils.mkbuildcontext(fileobj) + elif path.startswith(('http://', 'https://', + 'git://', 'github.com/', 'git@')): + remote = path + elif not os.path.isdir(path): + raise TypeError("You must specify a directory to build in path") + else: + dockerignore = os.path.join(path, '.dockerignore') + exclude = None + if os.path.exists(dockerignore): + with open(dockerignore, 'r') as f: + exclude = list(filter(bool, f.read().splitlines())) + context = utils.tar(path, exclude=exclude, dockerfile=dockerfile) + + if utils.compare_version('1.8', self._version) >= 0: + stream = True + + if dockerfile and utils.compare_version('1.17', self._version) < 0: + raise errors.InvalidVersion( + 'dockerfile was only introduced in API version 1.17' + ) + + if utils.compare_version('1.19', self._version) < 0: + pull = 1 if pull else 0 + + u = self._url('/build') + params = { + 't': tag, + 'remote': remote, + 'q': quiet, + 'nocache': nocache, + 'rm': rm, + 'forcerm': forcerm, + 'pull': pull, + 'dockerfile': dockerfile, + } + params.update(container_limits) + + if context is not None: + headers = {'Content-Type': 'application/tar'} + if encoding: + headers['Content-Encoding'] = encoding + + if utils.compare_version('1.9', self._version) >= 0: + self._set_auth_headers(headers) + + response = self._post( + u, + data=context, + params=params, + headers=headers, + stream=stream, + timeout=timeout, + ) + + if context is not None and not custom_context: + context.close() + + if stream: + return self._stream_helper(response, decode=decode) + else: + output = self._result(response) + srch = r'Successfully built ([0-9a-f]+)' + match = re.search(srch, output) + if not match: + return None, output + return match.group(1), output + + def _set_auth_headers(self, headers): + log.debug('Looking for auth config') + + # If we don't have any auth data so far, try reloading the config + # file one more time in case anything showed up in there. + if not self._auth_configs: + log.debug("No auth config in memory - loading from filesystem") + self._auth_configs = auth.load_config() + + # Send the full auth configuration (if any exists), since the build + # could use any (or all) of the registries. + if self._auth_configs: + log.debug( + 'Sending auth config ({0})'.format( + ', '.join(repr(k) for k in self._auth_configs.keys()) + ) + ) + if headers is None: + headers = {} + if utils.compare_version('1.19', self._version) >= 0: + headers['X-Registry-Config'] = auth.encode_header( + self._auth_configs + ) + else: + headers['X-Registry-Config'] = auth.encode_header({ + 'configs': self._auth_configs + }) + else: + log.debug('No auth config found') diff --git a/docker/api/container.py b/docker/api/container.py new file mode 100644 index 00000000..f90e8a18 --- /dev/null +++ b/docker/api/container.py @@ -0,0 +1,352 @@ +import six +import warnings + +from .. import errors +from ..utils import utils, check_resource + + +class ContainerApiMixin(object): + @check_resource + def attach(self, container, stdout=True, stderr=True, + stream=False, logs=False): + params = { + 'logs': logs and 1 or 0, + 'stdout': stdout and 1 or 0, + 'stderr': stderr and 1 or 0, + 'stream': stream and 1 or 0, + } + u = self._url("/containers/{0}/attach", container) + response = self._post(u, params=params, stream=stream) + + return self._get_result(container, stream, response) + + @check_resource + def attach_socket(self, container, params=None, ws=False): + if params is None: + params = { + 'stdout': 1, + 'stderr': 1, + 'stream': 1 + } + + if ws: + return self._attach_websocket(container, params) + + u = self._url("/containers/{0}/attach", container) + return self._get_raw_response_socket(self.post( + u, None, params=self._attach_params(params), stream=True)) + + @check_resource + def commit(self, container, repository=None, tag=None, message=None, + author=None, conf=None): + params = { + 'container': container, + 'repo': repository, + 'tag': tag, + 'comment': message, + 'author': author + } + u = self._url("/commit") + return self._result(self._post_json(u, data=conf, params=params), + json=True) + + def containers(self, quiet=False, all=False, trunc=False, latest=False, + since=None, before=None, limit=-1, size=False, + filters=None): + params = { + 'limit': 1 if latest else limit, + 'all': 1 if all else 0, + 'size': 1 if size else 0, + 'trunc_cmd': 1 if trunc else 0, + 'since': since, + 'before': before + } + if filters: + params['filters'] = utils.convert_filters(filters) + u = self._url("/containers/json") + res = self._result(self._get(u, params=params), True) + + if quiet: + return [{'Id': x['Id']} for x in res] + if trunc: + for x in res: + x['Id'] = x['Id'][:12] + return res + + @check_resource + def copy(self, container, resource): + res = self._post_json( + self._url("/containers/{0}/copy".format(container)), + data={"Resource": resource}, + stream=True + ) + self._raise_for_status(res) + return res.raw + + def create_container(self, image, command=None, hostname=None, user=None, + detach=False, stdin_open=False, tty=False, + mem_limit=None, ports=None, environment=None, + dns=None, volumes=None, volumes_from=None, + network_disabled=False, name=None, entrypoint=None, + cpu_shares=None, working_dir=None, domainname=None, + memswap_limit=None, cpuset=None, host_config=None, + mac_address=None, labels=None, volume_driver=None): + + if isinstance(volumes, six.string_types): + volumes = [volumes, ] + + if host_config and utils.compare_version('1.15', self._version) < 0: + raise errors.InvalidVersion( + 'host_config is not supported in API < 1.15' + ) + + config = self.create_container_config( + image, command, hostname, user, detach, stdin_open, + tty, mem_limit, ports, environment, dns, volumes, volumes_from, + network_disabled, entrypoint, cpu_shares, working_dir, domainname, + memswap_limit, cpuset, host_config, mac_address, labels, + volume_driver + ) + return self.create_container_from_config(config, name) + + def create_container_config(self, *args, **kwargs): + return utils.create_container_config(self._version, *args, **kwargs) + + def create_container_from_config(self, config, name=None): + u = self._url("/containers/create") + params = { + 'name': name + } + res = self._post_json(u, data=config, params=params) + return self._result(res, True) + + def create_host_config(self, *args, **kwargs): + if not kwargs: + kwargs = {} + if 'version' in kwargs: + raise TypeError( + "create_host_config() got an unexpected " + "keyword argument 'version'" + ) + kwargs['version'] = self._version + return utils.create_host_config(*args, **kwargs) + + @check_resource + def diff(self, container): + return self._result( + self._get(self._url("/containers/{0}/changes", container)), True + ) + + @check_resource + def export(self, container): + res = self._get( + self._url("/containers/{0}/export", container), stream=True + ) + self._raise_for_status(res) + return res.raw + + @check_resource + def inspect_container(self, container): + return self._result( + self._get(self._url("/containers/{0}/json", container)), True + ) + + @check_resource + def kill(self, container, signal=None): + url = self._url("/containers/{0}/kill", container) + params = {} + if signal is not None: + params['signal'] = signal + res = self._post(url, params=params) + + self._raise_for_status(res) + + @check_resource + def logs(self, container, stdout=True, stderr=True, stream=False, + timestamps=False, tail='all'): + if utils.compare_version('1.11', self._version) >= 0: + params = {'stderr': stderr and 1 or 0, + 'stdout': stdout and 1 or 0, + 'timestamps': timestamps and 1 or 0, + 'follow': stream and 1 or 0, + } + if utils.compare_version('1.13', self._version) >= 0: + if tail != 'all' and (not isinstance(tail, int) or tail <= 0): + tail = 'all' + params['tail'] = tail + url = self._url("/containers/{0}/logs", container) + res = self._get(url, params=params, stream=stream) + return self._get_result(container, stream, res) + return self.attach( + container, + stdout=stdout, + stderr=stderr, + stream=stream, + logs=True + ) + + @check_resource + def pause(self, container): + url = self._url('/containers/{0}/pause', container) + res = self._post(url) + self._raise_for_status(res) + + @check_resource + def port(self, container, private_port): + res = self._get(self._url("/containers/{0}/json", container)) + self._raise_for_status(res) + json_ = res.json() + s_port = str(private_port) + h_ports = None + + # Port settings is None when the container is running with + # network_mode=host. + port_settings = json_.get('NetworkSettings', {}).get('Ports') + if port_settings is None: + return None + + h_ports = port_settings.get(s_port + '/udp') + if h_ports is None: + h_ports = port_settings.get(s_port + '/tcp') + + return h_ports + + @check_resource + def remove_container(self, container, v=False, link=False, force=False): + params = {'v': v, 'link': link, 'force': force} + res = self._delete( + self._url("/containers/{0}", container), params=params + ) + self._raise_for_status(res) + + @check_resource + def rename(self, container, name): + if utils.compare_version('1.17', self._version) < 0: + raise errors.InvalidVersion( + 'rename was only introduced in API version 1.17' + ) + url = self._url("/containers/{0}/rename", container) + params = {'name': name} + res = self._post(url, params=params) + self._raise_for_status(res) + + @check_resource + def resize(self, container, height, width): + params = {'h': height, 'w': width} + url = self._url("/containers/{0}/resize", container) + res = self._post(url, params=params) + self._raise_for_status(res) + + @check_resource + def restart(self, container, timeout=10): + params = {'t': timeout} + url = self._url("/containers/{0}/restart", container) + res = self._post(url, params=params) + self._raise_for_status(res) + + @check_resource + def start(self, container, binds=None, port_bindings=None, lxc_conf=None, + publish_all_ports=None, links=None, privileged=None, + dns=None, dns_search=None, volumes_from=None, network_mode=None, + restart_policy=None, cap_add=None, cap_drop=None, devices=None, + extra_hosts=None, read_only=None, pid_mode=None, ipc_mode=None, + security_opt=None, ulimits=None): + + if utils.compare_version('1.10', self._version) < 0: + if dns is not None: + raise errors.InvalidVersion( + 'dns is only supported for API version >= 1.10' + ) + if volumes_from is not None: + raise errors.InvalidVersion( + 'volumes_from is only supported for API version >= 1.10' + ) + + if utils.compare_version('1.15', self._version) < 0: + if security_opt is not None: + raise errors.InvalidVersion( + 'security_opt is only supported for API version >= 1.15' + ) + if ipc_mode: + raise errors.InvalidVersion( + 'ipc_mode is only supported for API version >= 1.15' + ) + + if utils.compare_version('1.17', self._version) < 0: + if read_only is not None: + raise errors.InvalidVersion( + 'read_only is only supported for API version >= 1.17' + ) + if pid_mode is not None: + raise errors.InvalidVersion( + 'pid_mode is only supported for API version >= 1.17' + ) + + if utils.compare_version('1.18', self._version) < 0: + if ulimits is not None: + raise errors.InvalidVersion( + 'ulimits is only supported for API version >= 1.18' + ) + + start_config_kwargs = dict( + binds=binds, port_bindings=port_bindings, lxc_conf=lxc_conf, + publish_all_ports=publish_all_ports, links=links, dns=dns, + privileged=privileged, dns_search=dns_search, cap_add=cap_add, + cap_drop=cap_drop, volumes_from=volumes_from, devices=devices, + network_mode=network_mode, restart_policy=restart_policy, + extra_hosts=extra_hosts, read_only=read_only, pid_mode=pid_mode, + ipc_mode=ipc_mode, security_opt=security_opt, ulimits=ulimits + ) + start_config = None + + if any(v is not None for v in start_config_kwargs.values()): + if utils.compare_version('1.15', self._version) > 0: + warnings.warn( + 'Passing host config parameters in start() is deprecated. ' + 'Please use host_config in create_container instead!', + DeprecationWarning + ) + start_config = self.create_host_config(**start_config_kwargs) + + url = self._url("/containers/{0}/start", container) + res = self._post_json(url, data=start_config) + self._raise_for_status(res) + + @check_resource + def stats(self, container, decode=None): + if utils.compare_version('1.17', self._version) < 0: + raise errors.InvalidVersion( + 'Stats retrieval is not supported in API < 1.17!') + + url = self._url("/containers/{0}/stats", container) + return self._stream_helper(self._get(url, stream=True), decode=decode) + + @check_resource + def stop(self, container, timeout=10): + params = {'t': timeout} + url = self._url("/containers/{0}/stop", container) + + res = self._post(url, params=params, + timeout=(timeout + (self.timeout or 0))) + self._raise_for_status(res) + + @check_resource + def top(self, container): + u = self._url("/containers/{0}/top", container) + return self._result(self._get(u), True) + + @check_resource + def unpause(self, container): + url = self._url('/containers/{0}/unpause', container) + res = self._post(url) + self._raise_for_status(res) + + @check_resource + def wait(self, container, timeout=None): + url = self._url("/containers/{0}/wait", container) + res = self._post(url, timeout=timeout) + self._raise_for_status(res) + json_ = res.json() + if 'StatusCode' in json_: + return json_['StatusCode'] + return -1 diff --git a/docker/api/daemon.py b/docker/api/daemon.py new file mode 100644 index 00000000..a149e5e3 --- /dev/null +++ b/docker/api/daemon.py @@ -0,0 +1,78 @@ +import os +import warnings +from datetime import datetime + +from ..auth import auth +from ..constants import INSECURE_REGISTRY_DEPRECATION_WARNING +from ..utils import utils + + +class DaemonApiMixin(object): + def events(self, since=None, until=None, filters=None, decode=None): + if isinstance(since, datetime): + since = utils.datetime_to_timestamp(since) + + if isinstance(until, datetime): + until = utils.datetime_to_timestamp(until) + + if filters: + filters = utils.convert_filters(filters) + + params = { + 'since': since, + 'until': until, + 'filters': filters + } + + return self._stream_helper( + self.get(self._url('/events'), params=params, stream=True), + decode=decode + ) + + def info(self): + return self._result(self._get(self._url("/info")), True) + + def login(self, username, password=None, email=None, registry=None, + reauth=False, insecure_registry=False, dockercfg_path=None): + if insecure_registry: + warnings.warn( + INSECURE_REGISTRY_DEPRECATION_WARNING.format('login()'), + DeprecationWarning + ) + + # If we don't have any auth data so far, try reloading the config file + # one more time in case anything showed up in there. + # If dockercfg_path is passed check to see if the config file exists, + # if so load that config. + if dockercfg_path and os.path.exists(dockercfg_path): + self._auth_configs = auth.load_config(dockercfg_path) + elif not self._auth_configs: + self._auth_configs = auth.load_config() + + registry = registry or auth.INDEX_URL + + authcfg = auth.resolve_authconfig(self._auth_configs, registry) + # If we found an existing auth config for this registry and username + # combination, we can return it immediately unless reauth is requested. + if authcfg and authcfg.get('username', None) == username \ + and not reauth: + return authcfg + + req_data = { + 'username': username, + 'password': password, + 'email': email, + 'serveraddress': registry, + } + + response = self._post_json(self._url('/auth'), data=req_data) + if response.status_code == 200: + self._auth_configs[registry] = req_data + return self._result(response, json=True) + + def ping(self): + return self._result(self._get(self._url('/_ping'))) + + def version(self, api_version=True): + url = self._url("/version", versioned_api=api_version) + return self._result(self._get(url), json=True) diff --git a/docker/api/exec_api.py b/docker/api/exec_api.py new file mode 100644 index 00000000..87eb143d --- /dev/null +++ b/docker/api/exec_api.py @@ -0,0 +1,76 @@ +import shlex + +import six + +from .. import errors +from ..utils import utils, check_resource + + +class ExecApiMixin(object): + @check_resource + def exec_create(self, container, cmd, stdout=True, stderr=True, tty=False, + privileged=False, user=''): + if utils.compare_version('1.15', self._version) < 0: + raise errors.InvalidVersion('Exec is not supported in API < 1.15') + if privileged and utils.compare_version('1.19', self._version) < 0: + raise errors.InvalidVersion( + 'Privileged exec is not supported in API < 1.19' + ) + if user and utils.compare_version('1.19', self._version) < 0: + raise errors.InvalidVersion( + 'User-specific exec is not supported in API < 1.19' + ) + if isinstance(cmd, six.string_types): + cmd = shlex.split(str(cmd)) + + data = { + 'Container': container, + 'User': user, + 'Privileged': privileged, + 'Tty': tty, + 'AttachStdin': False, + 'AttachStdout': stdout, + 'AttachStderr': stderr, + 'Cmd': cmd + } + + url = self._url('/containers/{0}/exec', container) + res = self._post_json(url, data=data) + return self._result(res, True) + + def exec_inspect(self, exec_id): + if utils.compare_version('1.16', self._version) < 0: + raise errors.InvalidVersion( + 'exec_inspect is not supported in API < 1.16' + ) + if isinstance(exec_id, dict): + exec_id = exec_id.get('Id') + res = self._get(self._url("/exec/{0}/json", exec_id)) + return self._result(res, True) + + def exec_resize(self, exec_id, height=None, width=None): + if utils.compare_version('1.15', self._version) < 0: + raise errors.InvalidVersion('Exec is not supported in API < 1.15') + if isinstance(exec_id, dict): + exec_id = exec_id.get('Id') + + params = {'h': height, 'w': width} + url = self._url("/exec/{0}/resize", exec_id) + res = self._post(url, params=params) + self._raise_for_status(res) + + def exec_start(self, exec_id, detach=False, tty=False, stream=False): + if utils.compare_version('1.15', self._version) < 0: + raise errors.InvalidVersion('Exec is not supported in API < 1.15') + if isinstance(exec_id, dict): + exec_id = exec_id.get('Id') + + data = { + 'Tty': tty, + 'Detach': detach + } + + res = self._post_json( + self._url('/exec/{0}/start', exec_id), data=data, stream=stream + ) + return self._get_result_tty(stream, res, tty) diff --git a/docker/api/image.py b/docker/api/image.py new file mode 100644 index 00000000..c6939ef4 --- /dev/null +++ b/docker/api/image.py @@ -0,0 +1,271 @@ +import logging +import six +import warnings + +from ..auth import auth +from ..constants import INSECURE_REGISTRY_DEPRECATION_WARNING +from ..utils import utils, check_resource +from .. import errors + +log = logging.getLogger(__name__) + + +class ImageApiMixin(object): + + @check_resource + def get_image(self, image): + res = self._get(self._url("/images/{0}/get", image), stream=True) + self._raise_for_status(res) + return res.raw + + @check_resource + def history(self, image): + res = self._get(self._url("/images/{0}/history", image)) + return self._result(res, True) + + def images(self, name=None, quiet=False, all=False, viz=False, + filters=None): + if viz: + if utils.compare_version('1.7', self._version) >= 0: + raise Exception('Viz output is not supported in API >= 1.7!') + return self._result(self._get(self._url("images/viz"))) + params = { + 'filter': name, + 'only_ids': 1 if quiet else 0, + 'all': 1 if all else 0, + } + if filters: + params['filters'] = utils.convert_filters(filters) + res = self._result(self._get(self._url("/images/json"), params=params), + True) + if quiet: + return [x['Id'] for x in res] + return res + + def import_image(self, src=None, repository=None, tag=None, image=None): + if src: + if isinstance(src, six.string_types): + try: + result = self.import_image_from_file( + src, repository=repository, tag=tag) + except IOError: + result = self.import_image_from_url( + src, repository=repository, tag=tag) + else: + result = self.import_image_from_data( + src, repository=repository, tag=tag) + elif image: + result = self.import_image_from_image( + image, repository=repository, tag=tag) + else: + raise Exception("Must specify a src or image") + + return result + + def import_image_from_data(self, data, repository=None, tag=None): + u = self._url("/images/create") + params = { + 'fromSrc': '-', + 'repo': repository, + 'tag': tag + } + headers = { + 'Content-Type': 'application/tar', + } + return self._result( + self._post(u, data=data, params=params, headers=headers)) + + def import_image_from_file(self, filename, repository=None, tag=None): + u = self._url("/images/create") + params = { + 'fromSrc': '-', + 'repo': repository, + 'tag': tag + } + headers = { + 'Content-Type': 'application/tar', + } + with open(filename, 'rb') as f: + return self._result( + self._post(u, data=f, params=params, headers=headers, + timeout=None)) + + def import_image_from_stream(self, stream, repository=None, tag=None): + u = self._url("/images/create") + params = { + 'fromSrc': '-', + 'repo': repository, + 'tag': tag + } + headers = { + 'Content-Type': 'application/tar', + 'Transfer-Encoding': 'chunked', + } + return self._result( + self._post(u, data=stream, params=params, headers=headers)) + + def import_image_from_url(self, url, repository=None, tag=None): + u = self._url("/images/create") + params = { + 'fromSrc': url, + 'repo': repository, + 'tag': tag + } + return self._result( + self._post(u, data=None, params=params)) + + def import_image_from_image(self, image, repository=None, tag=None): + u = self._url("/images/create") + params = { + 'fromImage': image, + 'repo': repository, + 'tag': tag + } + return self._result( + self._post(u, data=None, params=params)) + + @check_resource + def insert(self, image, url, path): + if utils.compare_version('1.12', self._version) >= 0: + raise errors.DeprecatedMethod( + 'insert is not available for API version >=1.12' + ) + api_url = self._url("/images/{0}/insert", image) + params = { + 'url': url, + 'path': path + } + return self._result(self._post(api_url, params=params)) + + @check_resource + def inspect_image(self, image): + return self._result( + self._get(self._url("/images/{0}/json", image)), True + ) + + def load_image(self, data): + res = self._post(self._url("/images/load"), data=data) + self._raise_for_status(res) + + def pull(self, repository, tag=None, stream=False, + insecure_registry=False, auth_config=None): + if insecure_registry: + warnings.warn( + INSECURE_REGISTRY_DEPRECATION_WARNING.format('pull()'), + DeprecationWarning + ) + + if not tag: + repository, tag = utils.parse_repository_tag(repository) + registry, repo_name = auth.resolve_repository_name(repository) + if repo_name.count(":") == 1: + repository, tag = repository.rsplit(":", 1) + + params = { + 'tag': tag, + 'fromImage': repository + } + headers = {} + + if utils.compare_version('1.5', self._version) >= 0: + # If we don't have any auth data so far, try reloading the config + # file one more time in case anything showed up in there. + if auth_config is None: + log.debug('Looking for auth config') + if not self._auth_configs: + log.debug( + "No auth config in memory - loading from filesystem") + self._auth_configs = auth.load_config() + authcfg = auth.resolve_authconfig(self._auth_configs, registry) + # Do not fail here if no authentication exists for this + # specific registry as we can have a readonly pull. Just + # put the header if we can. + if authcfg: + log.debug('Found auth config') + # auth_config needs to be a dict in the format used by + # auth.py username , password, serveraddress, email + headers['X-Registry-Auth'] = auth.encode_header( + authcfg + ) + else: + log.debug('No auth config found') + else: + log.debug('Sending supplied auth config') + headers['X-Registry-Auth'] = auth.encode_header(auth_config) + + response = self._post( + self._url('/images/create'), params=params, headers=headers, + stream=stream, timeout=None + ) + + self._raise_for_status(response) + + if stream: + return self._stream_helper(response) + + return self._result(response) + + def push(self, repository, tag=None, stream=False, + insecure_registry=False): + if insecure_registry: + warnings.warn( + INSECURE_REGISTRY_DEPRECATION_WARNING.format('push()'), + DeprecationWarning + ) + + if not tag: + repository, tag = utils.parse_repository_tag(repository) + registry, repo_name = auth.resolve_repository_name(repository) + u = self._url("/images/{0}/push", repository) + params = { + 'tag': tag + } + headers = {} + + if utils.compare_version('1.5', self._version) >= 0: + # If we don't have any auth data so far, try reloading the config + # file one more time in case anything showed up in there. + if not self._auth_configs: + self._auth_configs = auth.load_config() + authcfg = auth.resolve_authconfig(self._auth_configs, registry) + + # Do not fail here if no authentication exists for this specific + # registry as we can have a readonly pull. Just put the header if + # we can. + if authcfg: + headers['X-Registry-Auth'] = auth.encode_header(authcfg) + + response = self._post_json( + u, None, headers=headers, stream=stream, params=params + ) + + self._raise_for_status(response) + + if stream: + return self._stream_helper(response) + + return self._result(response) + + @check_resource + def remove_image(self, image, force=False, noprune=False): + params = {'force': force, 'noprune': noprune} + res = self._delete(self._url("/images/{0}", image), params=params) + self._raise_for_status(res) + + def search(self, term): + return self._result( + self._get(self._url("/images/search"), params={'term': term}), + True + ) + + @check_resource + def tag(self, image, repository, tag=None, force=False): + params = { + 'tag': tag, + 'repo': repository, + 'force': 1 if force else 0 + } + url = self._url("/images/{0}/tag", image) + res = self._post(url, params=params) + self._raise_for_status(res) + return res.status_code == 201 diff --git a/docker/client.py b/docker/client.py index 88bc50de..2eb859cf 100644 --- a/docker/client.py +++ b/docker/client.py @@ -12,882 +12,301 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import os -import re -import shlex -import warnings -from datetime import datetime +import json +import struct +import sys +import requests +import requests.exceptions import six +import websocket -from . import clientbase + +from . import api from . import constants from . import errors from .auth import auth +from .unixconn import unixconn +from .ssladapter import ssladapter from .utils import utils, check_resource -from .constants import INSECURE_REGISTRY_DEPRECATION_WARNING - -log = logging.getLogger(__name__) +from .tls import TLSConfig -class Client(clientbase.ClientBase): - @check_resource - def attach(self, container, stdout=True, stderr=True, - stream=False, logs=False): - params = { - 'logs': logs and 1 or 0, - 'stdout': stdout and 1 or 0, - 'stderr': stderr and 1 or 0, - 'stream': stream and 1 or 0, - } - u = self._url("/containers/{0}/attach", container) - response = self._post(u, params=params, stream=stream) +class Client( + requests.Session, + api.BuildApiMixin, + api.ContainerApiMixin, + api.DaemonApiMixin, + api.ExecApiMixin, + api.ImageApiMixin): + def __init__(self, base_url=None, version=None, + timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False): + super(Client, self).__init__() - return self._get_result(container, stream, response) + if tls and not base_url.startswith('https://'): + raise errors.TLSParameterError( + 'If using TLS, the base_url argument must begin with ' + '"https://".') - @check_resource - def attach_socket(self, container, params=None, ws=False): - if params is None: - params = { - 'stdout': 1, - 'stderr': 1, - 'stream': 1 - } + self.base_url = base_url + self.timeout = timeout - if ws: - return self._attach_websocket(container, params) + self._auth_configs = auth.load_config() - u = self._url("/containers/{0}/attach", container) - return self._get_raw_response_socket(self.post( - u, None, params=self._attach_params(params), stream=True)) + base_url = utils.parse_host(base_url, sys.platform) + if base_url.startswith('http+unix://'): + self._custom_adapter = unixconn.UnixAdapter(base_url, timeout) + self.mount('http+docker://', self._custom_adapter) + self.base_url = 'http+docker://localunixsocket' + else: + # Use SSLAdapter for the ability to specify SSL version + if isinstance(tls, TLSConfig): + tls.configure_client(self) + elif tls: + self._custom_adapter = ssladapter.SSLAdapter() + self.mount('https://', self._custom_adapter) + self.base_url = base_url - def build(self, path=None, tag=None, quiet=False, fileobj=None, - nocache=False, rm=False, stream=False, timeout=None, - custom_context=False, encoding=None, pull=False, - forcerm=False, dockerfile=None, container_limits=None, - decode=False): - remote = context = headers = None - container_limits = container_limits or {} - if path is None and fileobj is None: - raise TypeError("Either path or fileobj needs to be provided.") - - for key in container_limits.keys(): - if key not in constants.CONTAINER_LIMITS_KEYS: - raise errors.DockerException( - 'Invalid container_limits key {0}'.format(key) + # version detection needs to be after unix adapter mounting + if version is None: + self._version = constants.DEFAULT_DOCKER_API_VERSION + elif isinstance(version, six.string_types): + if version.lower() == 'auto': + self._version = self._retrieve_server_version() + else: + self._version = version + else: + raise errors.DockerException( + 'Version parameter must be a string or None. Found {0}'.format( + type(version).__name__ ) + ) - if custom_context: - if not fileobj: - raise TypeError("You must specify fileobj with custom_context") - context = fileobj - elif fileobj is not None: - context = utils.mkbuildcontext(fileobj) - elif path.startswith(('http://', 'https://', - 'git://', 'github.com/', 'git@')): - remote = path - elif not os.path.isdir(path): - raise TypeError("You must specify a directory to build in path") + def _retrieve_server_version(self): + try: + return self.version(api_version=False)["ApiVersion"] + except KeyError: + raise errors.DockerException( + 'Invalid response from docker daemon: key "ApiVersion"' + ' is missing.' + ) + except Exception as e: + raise errors.DockerException( + 'Error while fetching server API version: {0}'.format(e) + ) + + def _set_request_timeout(self, kwargs): + """Prepare the kwargs for an HTTP request by inserting the timeout + parameter, if not already present.""" + kwargs.setdefault('timeout', self.timeout) + return kwargs + + def _post(self, url, **kwargs): + return self.post(url, **self._set_request_timeout(kwargs)) + + def _get(self, url, **kwargs): + return self.get(url, **self._set_request_timeout(kwargs)) + + def _delete(self, url, **kwargs): + return self.delete(url, **self._set_request_timeout(kwargs)) + + def _url(self, pathfmt, resource_id=None, versioned_api=True): + if resource_id and not isinstance(resource_id, six.string_types): + raise ValueError( + 'Expected a resource ID string but found {0} ({1}) ' + 'instead'.format(resource_id, type(resource_id)) + ) + elif resource_id: + resource_id = six.moves.urllib.parse.quote_plus(resource_id) + + if versioned_api: + return '{0}/v{1}{2}'.format( + self.base_url, self._version, pathfmt.format(resource_id) + ) else: - dockerignore = os.path.join(path, '.dockerignore') - exclude = None - if os.path.exists(dockerignore): - with open(dockerignore, 'r') as f: - exclude = list(filter(bool, f.read().splitlines())) - context = utils.tar(path, exclude=exclude, dockerfile=dockerfile) - - if utils.compare_version('1.8', self._version) >= 0: - stream = True - - if dockerfile and utils.compare_version('1.17', self._version) < 0: - raise errors.InvalidVersion( - 'dockerfile was only introduced in API version 1.17' - ) - - if utils.compare_version('1.19', self._version) < 0: - pull = 1 if pull else 0 - - u = self._url('/build') - params = { - 't': tag, - 'remote': remote, - 'q': quiet, - 'nocache': nocache, - 'rm': rm, - 'forcerm': forcerm, - 'pull': pull, - 'dockerfile': dockerfile, - } - params.update(container_limits) - - if context is not None: - headers = {'Content-Type': 'application/tar'} - if encoding: - headers['Content-Encoding'] = encoding - - if utils.compare_version('1.9', self._version) >= 0: - log.debug('Looking for auth config') - - # If we don't have any auth data so far, try reloading the config - # file one more time in case anything showed up in there. - if not self._auth_configs: - log.debug("No auth config in memory - loading from filesystem") - self._auth_configs = auth.load_config() - - # Send the full auth configuration (if any exists), since the build - # could use any (or all) of the registries. - if self._auth_configs: - log.debug( - 'Sending auth config ({0})'.format( - ', '.join(repr(k) for k in self._auth_configs.keys()) - ) - ) - if headers is None: - headers = {} - if utils.compare_version('1.19', self._version) >= 0: - headers['X-Registry-Config'] = auth.encode_header( - self._auth_configs - ) - else: - headers['X-Registry-Config'] = auth.encode_header({ - 'configs': self._auth_configs - }) - else: - log.debug('No auth config found') - - response = self._post( - u, - data=context, - params=params, - headers=headers, - stream=stream, - timeout=timeout, - ) - - if context is not None and not custom_context: - context.close() - - if stream: - return self._stream_helper(response, decode=decode) - else: - output = self._result(response) - srch = r'Successfully built ([0-9a-f]+)' - match = re.search(srch, output) - if not match: - return None, output - return match.group(1), output - - @check_resource - def commit(self, container, repository=None, tag=None, message=None, - author=None, conf=None): - params = { - 'container': container, - 'repo': repository, - 'tag': tag, - 'comment': message, - 'author': author - } - u = self._url("/commit") - return self._result(self._post_json(u, data=conf, params=params), - json=True) - - def containers(self, quiet=False, all=False, trunc=False, latest=False, - since=None, before=None, limit=-1, size=False, - filters=None): - params = { - 'limit': 1 if latest else limit, - 'all': 1 if all else 0, - 'size': 1 if size else 0, - 'trunc_cmd': 1 if trunc else 0, - 'since': since, - 'before': before - } - if filters: - params['filters'] = utils.convert_filters(filters) - u = self._url("/containers/json") - res = self._result(self._get(u, params=params), True) - - if quiet: - return [{'Id': x['Id']} for x in res] - if trunc: - for x in res: - x['Id'] = x['Id'][:12] - return res - - @check_resource - def copy(self, container, resource): - res = self._post_json( - self._url("/containers/{0}/copy".format(container)), - data={"Resource": resource}, - stream=True - ) - self._raise_for_status(res) - return res.raw - - def create_container(self, image, command=None, hostname=None, user=None, - detach=False, stdin_open=False, tty=False, - mem_limit=None, ports=None, environment=None, - dns=None, volumes=None, volumes_from=None, - network_disabled=False, name=None, entrypoint=None, - cpu_shares=None, working_dir=None, domainname=None, - memswap_limit=None, cpuset=None, host_config=None, - mac_address=None, labels=None, volume_driver=None): - - if isinstance(volumes, six.string_types): - volumes = [volumes, ] - - if host_config and utils.compare_version('1.15', self._version) < 0: - raise errors.InvalidVersion( - 'host_config is not supported in API < 1.15' - ) - - config = self.create_container_config( - image, command, hostname, user, detach, stdin_open, - tty, mem_limit, ports, environment, dns, volumes, volumes_from, - network_disabled, entrypoint, cpu_shares, working_dir, domainname, - memswap_limit, cpuset, host_config, mac_address, labels, - volume_driver - ) - return self.create_container_from_config(config, name) - - def create_container_config(self, *args, **kwargs): - return utils.create_container_config(self._version, *args, **kwargs) - - def create_container_from_config(self, config, name=None): - u = self._url("/containers/create") - params = { - 'name': name - } - res = self._post_json(u, data=config, params=params) - return self._result(res, True) - - def create_host_config(self, *args, **kwargs): - if not kwargs: - kwargs = {} - if 'version' in kwargs: - raise TypeError( - "create_host_config() got an unexpected " - "keyword argument 'version'" - ) - kwargs['version'] = self._version - return utils.create_host_config(*args, **kwargs) - - @check_resource - def diff(self, container): - return self._result( - self._get(self._url("/containers/{0}/changes", container)), True - ) - - def events(self, since=None, until=None, filters=None, decode=None): - if isinstance(since, datetime): - since = utils.datetime_to_timestamp(since) - - if isinstance(until, datetime): - until = utils.datetime_to_timestamp(until) - - if filters: - filters = utils.convert_filters(filters) - - params = { - 'since': since, - 'until': until, - 'filters': filters - } - - return self._stream_helper( - self.get(self._url('/events'), params=params, stream=True), - decode=decode - ) - - @check_resource - def exec_create(self, container, cmd, stdout=True, stderr=True, tty=False, - privileged=False, user=''): - if utils.compare_version('1.15', self._version) < 0: - raise errors.InvalidVersion('Exec is not supported in API < 1.15') - if privileged and utils.compare_version('1.19', self._version) < 0: - raise errors.InvalidVersion( - 'Privileged exec is not supported in API < 1.19' - ) - if user and utils.compare_version('1.19', self._version) < 0: - raise errors.InvalidVersion( - 'User-specific exec is not supported in API < 1.19' - ) - if isinstance(cmd, six.string_types): - cmd = shlex.split(str(cmd)) - - data = { - 'Container': container, - 'User': user, - 'Privileged': privileged, - 'Tty': tty, - 'AttachStdin': False, - 'AttachStdout': stdout, - 'AttachStderr': stderr, - 'Cmd': cmd - } - - url = self._url('/containers/{0}/exec', container) - res = self._post_json(url, data=data) - return self._result(res, True) - - def exec_inspect(self, exec_id): - if utils.compare_version('1.16', self._version) < 0: - raise errors.InvalidVersion( - 'exec_inspect is not supported in API < 1.16' - ) - if isinstance(exec_id, dict): - exec_id = exec_id.get('Id') - res = self._get(self._url("/exec/{0}/json", exec_id)) - return self._result(res, True) - - def exec_resize(self, exec_id, height=None, width=None): - if utils.compare_version('1.15', self._version) < 0: - raise errors.InvalidVersion('Exec is not supported in API < 1.15') - if isinstance(exec_id, dict): - exec_id = exec_id.get('Id') - - params = {'h': height, 'w': width} - url = self._url("/exec/{0}/resize", exec_id) - res = self._post(url, params=params) - self._raise_for_status(res) - - def exec_start(self, exec_id, detach=False, tty=False, stream=False): - if utils.compare_version('1.15', self._version) < 0: - raise errors.InvalidVersion('Exec is not supported in API < 1.15') - if isinstance(exec_id, dict): - exec_id = exec_id.get('Id') - - data = { - 'Tty': tty, - 'Detach': detach - } - - res = self._post_json( - self._url('/exec/{0}/start', exec_id), data=data, stream=stream - ) - return self._get_result_tty(stream, res, tty) - - @check_resource - def export(self, container): - res = self._get( - self._url("/containers/{0}/export", container), stream=True - ) - self._raise_for_status(res) - return res.raw - - @check_resource - def get_image(self, image): - res = self._get(self._url("/images/{0}/get", image), stream=True) - self._raise_for_status(res) - return res.raw - - @check_resource - def history(self, image): - res = self._get(self._url("/images/{0}/history", image)) - return self._result(res, True) - - def images(self, name=None, quiet=False, all=False, viz=False, - filters=None): - if viz: - if utils.compare_version('1.7', self._version) >= 0: - raise Exception('Viz output is not supported in API >= 1.7!') - return self._result(self._get(self._url("images/viz"))) - params = { - 'filter': name, - 'only_ids': 1 if quiet else 0, - 'all': 1 if all else 0, - } - if filters: - params['filters'] = utils.convert_filters(filters) - res = self._result(self._get(self._url("/images/json"), params=params), - True) - if quiet: - return [x['Id'] for x in res] - return res - - def import_image(self, src=None, repository=None, tag=None, image=None): - if src: - if isinstance(src, six.string_types): - try: - result = self.import_image_from_file( - src, repository=repository, tag=tag) - except IOError: - result = self.import_image_from_url( - src, repository=repository, tag=tag) - else: - result = self.import_image_from_data( - src, repository=repository, tag=tag) - elif image: - result = self.import_image_from_image( - image, repository=repository, tag=tag) - else: - raise Exception("Must specify a src or image") - - return result - - def import_image_from_data(self, data, repository=None, tag=None): - u = self._url("/images/create") - params = { - 'fromSrc': '-', - 'repo': repository, - 'tag': tag - } - headers = { - 'Content-Type': 'application/tar', - } - return self._result( - self._post(u, data=data, params=params, headers=headers)) - - def import_image_from_file(self, filename, repository=None, tag=None): - u = self._url("/images/create") - params = { - 'fromSrc': '-', - 'repo': repository, - 'tag': tag - } - headers = { - 'Content-Type': 'application/tar', - } - with open(filename, 'rb') as f: - return self._result( - self._post(u, data=f, params=params, headers=headers, - timeout=None)) - - def import_image_from_stream(self, stream, repository=None, tag=None): - u = self._url("/images/create") - params = { - 'fromSrc': '-', - 'repo': repository, - 'tag': tag - } - headers = { - 'Content-Type': 'application/tar', - 'Transfer-Encoding': 'chunked', - } - return self._result( - self._post(u, data=stream, params=params, headers=headers)) - - def import_image_from_url(self, url, repository=None, tag=None): - u = self._url("/images/create") - params = { - 'fromSrc': url, - 'repo': repository, - 'tag': tag - } - return self._result( - self._post(u, data=None, params=params)) - - def import_image_from_image(self, image, repository=None, tag=None): - u = self._url("/images/create") - params = { - 'fromImage': image, - 'repo': repository, - 'tag': tag - } - return self._result( - self._post(u, data=None, params=params)) - - def info(self): - return self._result(self._get(self._url("/info")), - True) - - @check_resource - def insert(self, image, url, path): - if utils.compare_version('1.12', self._version) >= 0: - raise errors.DeprecatedMethod( - 'insert is not available for API version >=1.12' - ) - api_url = self._url("/images/{0}/insert", image) - params = { - 'url': url, - 'path': path - } - return self._result(self._post(api_url, params=params)) - - @check_resource - def inspect_container(self, container): - return self._result( - self._get(self._url("/containers/{0}/json", container)), True - ) - - @check_resource - def inspect_image(self, image): - return self._result( - self._get(self._url("/images/{0}/json", image)), True - ) - - @check_resource - def kill(self, container, signal=None): - url = self._url("/containers/{0}/kill", container) - params = {} - if signal is not None: - params['signal'] = signal - res = self._post(url, params=params) - - self._raise_for_status(res) - - def load_image(self, data): - res = self._post(self._url("/images/load"), data=data) - self._raise_for_status(res) - - def login(self, username, password=None, email=None, registry=None, - reauth=False, insecure_registry=False, dockercfg_path=None): - if insecure_registry: - warnings.warn( - INSECURE_REGISTRY_DEPRECATION_WARNING.format('login()'), - DeprecationWarning - ) - - # If we don't have any auth data so far, try reloading the config file - # one more time in case anything showed up in there. - # If dockercfg_path is passed check to see if the config file exists, - # if so load that config. - if dockercfg_path and os.path.exists(dockercfg_path): - self._auth_configs = auth.load_config(dockercfg_path) - elif not self._auth_configs: - self._auth_configs = auth.load_config() - - registry = registry or auth.INDEX_URL - - authcfg = auth.resolve_authconfig(self._auth_configs, registry) - # If we found an existing auth config for this registry and username - # combination, we can return it immediately unless reauth is requested. - if authcfg and authcfg.get('username', None) == username \ - and not reauth: - return authcfg - - req_data = { - 'username': username, - 'password': password, - 'email': email, - 'serveraddress': registry, - } - - response = self._post_json(self._url('/auth'), data=req_data) - if response.status_code == 200: - self._auth_configs[registry] = req_data - return self._result(response, json=True) - - @check_resource - def logs(self, container, stdout=True, stderr=True, stream=False, - timestamps=False, tail='all'): - if utils.compare_version('1.11', self._version) >= 0: - params = {'stderr': stderr and 1 or 0, - 'stdout': stdout and 1 or 0, - 'timestamps': timestamps and 1 or 0, - 'follow': stream and 1 or 0, - } - if utils.compare_version('1.13', self._version) >= 0: - if tail != 'all' and (not isinstance(tail, int) or tail <= 0): - tail = 'all' - params['tail'] = tail - url = self._url("/containers/{0}/logs", container) - res = self._get(url, params=params, stream=stream) - return self._get_result(container, stream, res) - return self.attach( - container, - stdout=stdout, - stderr=stderr, - stream=stream, - logs=True - ) - - @check_resource - def pause(self, container): - url = self._url('/containers/{0}/pause', container) - res = self._post(url) - self._raise_for_status(res) - - def ping(self): - return self._result(self._get(self._url('/_ping'))) - - @check_resource - def port(self, container, private_port): - res = self._get(self._url("/containers/{0}/json", container)) - self._raise_for_status(res) - json_ = res.json() - s_port = str(private_port) - h_ports = None - - # Port settings is None when the container is running with - # network_mode=host. - port_settings = json_.get('NetworkSettings', {}).get('Ports') - if port_settings is None: - return None - - h_ports = port_settings.get(s_port + '/udp') - if h_ports is None: - h_ports = port_settings.get(s_port + '/tcp') - - return h_ports - - def pull(self, repository, tag=None, stream=False, - insecure_registry=False, auth_config=None): - if insecure_registry: - warnings.warn( - INSECURE_REGISTRY_DEPRECATION_WARNING.format('pull()'), - DeprecationWarning - ) - - if not tag: - repository, tag = utils.parse_repository_tag(repository) - registry, repo_name = auth.resolve_repository_name(repository) - if repo_name.count(":") == 1: - repository, tag = repository.rsplit(":", 1) - - params = { - 'tag': tag, - 'fromImage': repository - } - headers = {} - - if utils.compare_version('1.5', self._version) >= 0: - # If we don't have any auth data so far, try reloading the config - # file one more time in case anything showed up in there. - if auth_config is None: - log.debug('Looking for auth config') - if not self._auth_configs: - log.debug( - "No auth config in memory - loading from filesystem") - self._auth_configs = auth.load_config() - authcfg = auth.resolve_authconfig(self._auth_configs, registry) - # Do not fail here if no authentication exists for this - # specific registry as we can have a readonly pull. Just - # put the header if we can. - if authcfg: - log.debug('Found auth config') - # auth_config needs to be a dict in the format used by - # auth.py username , password, serveraddress, email - headers['X-Registry-Auth'] = auth.encode_header( - authcfg - ) - else: - log.debug('No auth config found') - else: - log.debug('Sending supplied auth config') - headers['X-Registry-Auth'] = auth.encode_header(auth_config) - - response = self._post( - self._url('/images/create'), params=params, headers=headers, - stream=stream, timeout=None - ) - + return '{0}{1}'.format(self.base_url, pathfmt.format(resource_id)) + + def _raise_for_status(self, response, explanation=None): + """Raises stored :class:`APIError`, if one occurred.""" + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + raise errors.NotFound(e, response, explanation=explanation) + raise errors.APIError(e, response, explanation=explanation) + + def _result(self, response, json=False, binary=False): + assert not (json and binary) self._raise_for_status(response) - if stream: - return self._stream_helper(response) + if json: + return response.json() + if binary: + return response.content + return response.text - return self._result(response) + def _post_json(self, url, data, **kwargs): + # Go <1.1 can't unserialize null to a string + # so we do this disgusting thing here. + data2 = {} + if data is not None: + for k, v in six.iteritems(data): + if v is not None: + data2[k] = v - def push(self, repository, tag=None, stream=False, - insecure_registry=False): - if insecure_registry: - warnings.warn( - INSECURE_REGISTRY_DEPRECATION_WARNING.format('push()'), - DeprecationWarning - ) + if 'headers' not in kwargs: + kwargs['headers'] = {} + kwargs['headers']['Content-Type'] = 'application/json' + return self._post(url, data=json.dumps(data2), **kwargs) - if not tag: - repository, tag = utils.parse_repository_tag(repository) - registry, repo_name = auth.resolve_repository_name(repository) - u = self._url("/images/{0}/push", repository) - params = { - 'tag': tag + def _attach_params(self, override=None): + return override or { + 'stdout': 1, + 'stderr': 1, + 'stream': 1 } - headers = {} - if utils.compare_version('1.5', self._version) >= 0: - # If we don't have any auth data so far, try reloading the config - # file one more time in case anything showed up in there. - if not self._auth_configs: - self._auth_configs = auth.load_config() - authcfg = auth.resolve_authconfig(self._auth_configs, registry) + @check_resource + def _attach_websocket(self, container, params=None): + url = self._url("/containers/{0}/attach/ws", container) + req = requests.Request("POST", url, params=self._attach_params(params)) + full_url = req.prepare().url + full_url = full_url.replace("http://", "ws://", 1) + full_url = full_url.replace("https://", "wss://", 1) + return self._create_websocket_connection(full_url) - # Do not fail here if no authentication exists for this specific - # registry as we can have a readonly pull. Just put the header if - # we can. - if authcfg: - headers['X-Registry-Auth'] = auth.encode_header(authcfg) - - response = self._post_json( - u, None, headers=headers, stream=stream, params=params - ) + def _create_websocket_connection(self, url): + return websocket.create_connection(url) + def _get_raw_response_socket(self, response): self._raise_for_status(response) + if six.PY3: + sock = response.raw._fp.fp.raw + else: + sock = response.raw._fp.fp._sock + try: + # Keep a reference to the response to stop it being garbage + # collected. If the response is garbage collected, it will + # close TLS sockets. + sock._response = response + except AttributeError: + # UNIX sockets can't have attributes set on them, but that's + # fine because we won't be doing TLS over them + pass + return sock + + def _stream_helper(self, response, decode=False): + """Generator for data coming from a chunked-encoded HTTP response.""" + if response.raw._fp.chunked: + reader = response.raw + while not reader.closed: + # this read call will block until we get a chunk + data = reader.read(1) + if not data: + break + if reader._fp.chunk_left: + data += reader.read(reader._fp.chunk_left) + if decode: + if six.PY3: + data = data.decode('utf-8') + data = json.loads(data) + yield data + else: + # Response isn't chunked, meaning we probably + # encountered an error immediately + yield self._result(response) + + def _multiplexed_buffer_helper(self, response): + """A generator of multiplexed data blocks read from a buffered + response.""" + buf = self._result(response, binary=True) + walker = 0 + while True: + if len(buf[walker:]) < 8: + break + _, length = struct.unpack_from('>BxxxL', buf[walker:]) + start = walker + constants.STREAM_HEADER_SIZE_BYTES + end = start + length + walker = end + yield buf[start:end] + + def _multiplexed_response_stream_helper(self, response): + """A generator of multiplexed data blocks coming from a response + stream.""" + + # Disable timeout on the underlying socket to prevent + # Read timed out(s) for long running processes + socket = self._get_raw_response_socket(response) + if six.PY3: + socket._sock.settimeout(None) + else: + socket.settimeout(None) + + while True: + header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES) + if not header: + break + _, length = struct.unpack('>BxxxL', header) + if not length: + continue + data = response.raw.read(length) + if not data: + break + yield data + + def _stream_raw_result_old(self, response): + ''' Stream raw output for API versions below 1.6 ''' + self._raise_for_status(response) + for line in response.iter_lines(chunk_size=1, + decode_unicode=True): + # filter out keep-alive new lines + if line: + yield line + + def _stream_raw_result(self, response): + ''' Stream result for TTY-enabled container above API 1.6 ''' + self._raise_for_status(response) + for out in response.iter_content(chunk_size=1, decode_unicode=True): + yield out + + def _get_result(self, container, stream, res): + cont = self.inspect_container(container) + return self._get_result_tty(stream, res, cont['Config']['Tty']) + + def _get_result_tty(self, stream, res, is_tty): + # Stream multi-plexing was only introduced in API v1.6. Anything + # before that needs old-style streaming. + if utils.compare_version('1.6', self._version) < 0: + return self._stream_raw_result_old(res) + + # We should also use raw streaming (without keep-alives) + # if we're dealing with a tty-enabled container. + if is_tty: + return self._stream_raw_result(res) if stream else \ + self._result(res, binary=True) + + self._raise_for_status(res) + sep = six.binary_type() if stream: - return self._stream_helper(response) - - return self._result(response) - - @check_resource - def remove_container(self, container, v=False, link=False, force=False): - params = {'v': v, 'link': link, 'force': force} - res = self._delete( - self._url("/containers/{0}", container), params=params - ) - self._raise_for_status(res) - - @check_resource - def remove_image(self, image, force=False, noprune=False): - params = {'force': force, 'noprune': noprune} - res = self._delete(self._url("/images/{0}", image), params=params) - self._raise_for_status(res) - - @check_resource - def rename(self, container, name): - if utils.compare_version('1.17', self._version) < 0: - raise errors.InvalidVersion( - 'rename was only introduced in API version 1.17' + return self._multiplexed_response_stream_helper(res) + else: + return sep.join( + [x for x in self._multiplexed_buffer_helper(res)] ) - url = self._url("/containers/{0}/rename", container) - params = {'name': name} - res = self._post(url, params=params) - self._raise_for_status(res) - @check_resource - def resize(self, container, height, width): - params = {'h': height, 'w': width} - url = self._url("/containers/{0}/resize", container) - res = self._post(url, params=params) - self._raise_for_status(res) + def get_adapter(self, url): + try: + return super(Client, self).get_adapter(url) + except requests.exceptions.InvalidSchema as e: + if self._custom_adapter: + return self._custom_adapter + else: + raise e - @check_resource - def restart(self, container, timeout=10): - params = {'t': timeout} - url = self._url("/containers/{0}/restart", container) - res = self._post(url, params=params) - self._raise_for_status(res) - - def search(self, term): - return self._result( - self._get(self._url("/images/search"), params={'term': term}), - True - ) - - @check_resource - def start(self, container, binds=None, port_bindings=None, lxc_conf=None, - publish_all_ports=None, links=None, privileged=None, - dns=None, dns_search=None, volumes_from=None, network_mode=None, - restart_policy=None, cap_add=None, cap_drop=None, devices=None, - extra_hosts=None, read_only=None, pid_mode=None, ipc_mode=None, - security_opt=None, ulimits=None): - - if utils.compare_version('1.10', self._version) < 0: - if dns is not None: - raise errors.InvalidVersion( - 'dns is only supported for API version >= 1.10' - ) - if volumes_from is not None: - raise errors.InvalidVersion( - 'volumes_from is only supported for API version >= 1.10' - ) - - if utils.compare_version('1.15', self._version) < 0: - if security_opt is not None: - raise errors.InvalidVersion( - 'security_opt is only supported for API version >= 1.15' - ) - if ipc_mode: - raise errors.InvalidVersion( - 'ipc_mode is only supported for API version >= 1.15' - ) - - if utils.compare_version('1.17', self._version) < 0: - if read_only is not None: - raise errors.InvalidVersion( - 'read_only is only supported for API version >= 1.17' - ) - if pid_mode is not None: - raise errors.InvalidVersion( - 'pid_mode is only supported for API version >= 1.17' - ) - - if utils.compare_version('1.18', self._version) < 0: - if ulimits is not None: - raise errors.InvalidVersion( - 'ulimits is only supported for API version >= 1.18' - ) - - start_config_kwargs = dict( - binds=binds, port_bindings=port_bindings, lxc_conf=lxc_conf, - publish_all_ports=publish_all_ports, links=links, dns=dns, - privileged=privileged, dns_search=dns_search, cap_add=cap_add, - cap_drop=cap_drop, volumes_from=volumes_from, devices=devices, - network_mode=network_mode, restart_policy=restart_policy, - extra_hosts=extra_hosts, read_only=read_only, pid_mode=pid_mode, - ipc_mode=ipc_mode, security_opt=security_opt, ulimits=ulimits - ) - start_config = None - - if any(v is not None for v in start_config_kwargs.values()): - if utils.compare_version('1.15', self._version) > 0: - warnings.warn( - 'Passing host config parameters in start() is deprecated. ' - 'Please use host_config in create_container instead!', - DeprecationWarning - ) - start_config = self.create_host_config(**start_config_kwargs) - - url = self._url("/containers/{0}/start", container) - res = self._post_json(url, data=start_config) - self._raise_for_status(res) - - @check_resource - def stats(self, container, decode=None): - if utils.compare_version('1.17', self._version) < 0: - raise errors.InvalidVersion( - 'Stats retrieval is not supported in API < 1.17!') - - url = self._url("/containers/{0}/stats", container) - return self._stream_helper(self._get(url, stream=True), decode=decode) - - @check_resource - def stop(self, container, timeout=10): - params = {'t': timeout} - url = self._url("/containers/{0}/stop", container) - - res = self._post(url, params=params, - timeout=(timeout + (self.timeout or 0))) - self._raise_for_status(res) - - @check_resource - def tag(self, image, repository, tag=None, force=False): - params = { - 'tag': tag, - 'repo': repository, - 'force': 1 if force else 0 - } - url = self._url("/images/{0}/tag", image) - res = self._post(url, params=params) - self._raise_for_status(res) - return res.status_code == 201 - - @check_resource - def top(self, container): - u = self._url("/containers/{0}/top", container) - return self._result(self._get(u), True) - - def version(self, api_version=True): - url = self._url("/version", versioned_api=api_version) - return self._result(self._get(url), json=True) - - @check_resource - def unpause(self, container): - url = self._url('/containers/{0}/unpause', container) - res = self._post(url) - self._raise_for_status(res) - - @check_resource - def wait(self, container, timeout=None): - url = self._url("/containers/{0}/wait", container) - res = self._post(url, timeout=timeout) - self._raise_for_status(res) - json_ = res.json() - if 'StatusCode' in json_: - return json_['StatusCode'] - return -1 + @property + def api_version(self): + return self._version class AutoVersionClient(Client): diff --git a/docker/clientbase.py b/docker/clientbase.py deleted file mode 100644 index a70339a9..00000000 --- a/docker/clientbase.py +++ /dev/null @@ -1,288 +0,0 @@ -import json -import struct -import sys - -import requests -import requests.exceptions -import six -import websocket - - -from . import constants -from . import errors -from .auth import auth -from .unixconn import unixconn -from .ssladapter import ssladapter -from .utils import utils, check_resource -from .tls import TLSConfig - - -class ClientBase(requests.Session): - def __init__(self, base_url=None, version=None, - timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False): - super(ClientBase, self).__init__() - - if tls and not base_url.startswith('https://'): - raise errors.TLSParameterError( - 'If using TLS, the base_url argument must begin with ' - '"https://".') - - self.base_url = base_url - self.timeout = timeout - - self._auth_configs = auth.load_config() - - base_url = utils.parse_host(base_url, sys.platform) - if base_url.startswith('http+unix://'): - self._custom_adapter = unixconn.UnixAdapter(base_url, timeout) - self.mount('http+docker://', self._custom_adapter) - self.base_url = 'http+docker://localunixsocket' - else: - # Use SSLAdapter for the ability to specify SSL version - if isinstance(tls, TLSConfig): - tls.configure_client(self) - elif tls: - self._custom_adapter = ssladapter.SSLAdapter() - self.mount('https://', self._custom_adapter) - self.base_url = base_url - - # version detection needs to be after unix adapter mounting - if version is None: - self._version = constants.DEFAULT_DOCKER_API_VERSION - elif isinstance(version, six.string_types): - if version.lower() == 'auto': - self._version = self._retrieve_server_version() - else: - self._version = version - else: - raise errors.DockerException( - 'Version parameter must be a string or None. Found {0}'.format( - type(version).__name__ - ) - ) - - def _retrieve_server_version(self): - try: - return self.version(api_version=False)["ApiVersion"] - except KeyError: - raise errors.DockerException( - 'Invalid response from docker daemon: key "ApiVersion"' - ' is missing.' - ) - except Exception as e: - raise errors.DockerException( - 'Error while fetching server API version: {0}'.format(e) - ) - - def _set_request_timeout(self, kwargs): - """Prepare the kwargs for an HTTP request by inserting the timeout - parameter, if not already present.""" - kwargs.setdefault('timeout', self.timeout) - return kwargs - - def _post(self, url, **kwargs): - return self.post(url, **self._set_request_timeout(kwargs)) - - def _get(self, url, **kwargs): - return self.get(url, **self._set_request_timeout(kwargs)) - - def _delete(self, url, **kwargs): - return self.delete(url, **self._set_request_timeout(kwargs)) - - def _url(self, pathfmt, resource_id=None, versioned_api=True): - if resource_id and not isinstance(resource_id, six.string_types): - raise ValueError( - 'Expected a resource ID string but found {0} ({1}) ' - 'instead'.format(resource_id, type(resource_id)) - ) - elif resource_id: - resource_id = six.moves.urllib.parse.quote_plus(resource_id) - - if versioned_api: - return '{0}/v{1}{2}'.format( - self.base_url, self._version, pathfmt.format(resource_id) - ) - else: - return '{0}{1}'.format(self.base_url, pathfmt.format(resource_id)) - - def _raise_for_status(self, response, explanation=None): - """Raises stored :class:`APIError`, if one occurred.""" - try: - response.raise_for_status() - except requests.exceptions.HTTPError as e: - if e.response.status_code == 404: - raise errors.NotFound(e, response, explanation=explanation) - raise errors.APIError(e, response, explanation=explanation) - - def _result(self, response, json=False, binary=False): - assert not (json and binary) - self._raise_for_status(response) - - if json: - return response.json() - if binary: - return response.content - return response.text - - def _post_json(self, url, data, **kwargs): - # Go <1.1 can't unserialize null to a string - # so we do this disgusting thing here. - data2 = {} - if data is not None: - for k, v in six.iteritems(data): - if v is not None: - data2[k] = v - - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Content-Type'] = 'application/json' - return self._post(url, data=json.dumps(data2), **kwargs) - - def _attach_params(self, override=None): - return override or { - 'stdout': 1, - 'stderr': 1, - 'stream': 1 - } - - @check_resource - def _attach_websocket(self, container, params=None): - url = self._url("/containers/{0}/attach/ws", container) - req = requests.Request("POST", url, params=self._attach_params(params)) - full_url = req.prepare().url - full_url = full_url.replace("http://", "ws://", 1) - full_url = full_url.replace("https://", "wss://", 1) - return self._create_websocket_connection(full_url) - - def _create_websocket_connection(self, url): - return websocket.create_connection(url) - - def _get_raw_response_socket(self, response): - self._raise_for_status(response) - if six.PY3: - sock = response.raw._fp.fp.raw - else: - sock = response.raw._fp.fp._sock - try: - # Keep a reference to the response to stop it being garbage - # collected. If the response is garbage collected, it will - # close TLS sockets. - sock._response = response - except AttributeError: - # UNIX sockets can't have attributes set on them, but that's - # fine because we won't be doing TLS over them - pass - - return sock - - def _stream_helper(self, response, decode=False): - """Generator for data coming from a chunked-encoded HTTP response.""" - if response.raw._fp.chunked: - reader = response.raw - while not reader.closed: - # this read call will block until we get a chunk - data = reader.read(1) - if not data: - break - if reader._fp.chunk_left: - data += reader.read(reader._fp.chunk_left) - if decode: - if six.PY3: - data = data.decode('utf-8') - data = json.loads(data) - yield data - else: - # Response isn't chunked, meaning we probably - # encountered an error immediately - yield self._result(response) - - def _multiplexed_buffer_helper(self, response): - """A generator of multiplexed data blocks read from a buffered - response.""" - buf = self._result(response, binary=True) - walker = 0 - while True: - if len(buf[walker:]) < 8: - break - _, length = struct.unpack_from('>BxxxL', buf[walker:]) - start = walker + constants.STREAM_HEADER_SIZE_BYTES - end = start + length - walker = end - yield buf[start:end] - - def _multiplexed_response_stream_helper(self, response): - """A generator of multiplexed data blocks coming from a response - stream.""" - - # Disable timeout on the underlying socket to prevent - # Read timed out(s) for long running processes - socket = self._get_raw_response_socket(response) - if six.PY3: - socket._sock.settimeout(None) - else: - socket.settimeout(None) - - while True: - header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES) - if not header: - break - _, length = struct.unpack('>BxxxL', header) - if not length: - continue - data = response.raw.read(length) - if not data: - break - yield data - - def _stream_raw_result_old(self, response): - ''' Stream raw output for API versions below 1.6 ''' - self._raise_for_status(response) - for line in response.iter_lines(chunk_size=1, - decode_unicode=True): - # filter out keep-alive new lines - if line: - yield line - - def _stream_raw_result(self, response): - ''' Stream result for TTY-enabled container above API 1.6 ''' - self._raise_for_status(response) - for out in response.iter_content(chunk_size=1, decode_unicode=True): - yield out - - def _get_result(self, container, stream, res): - cont = self.inspect_container(container) - return self._get_result_tty(stream, res, cont['Config']['Tty']) - - def _get_result_tty(self, stream, res, is_tty): - # Stream multi-plexing was only introduced in API v1.6. Anything - # before that needs old-style streaming. - if utils.compare_version('1.6', self._version) < 0: - return self._stream_raw_result_old(res) - - # We should also use raw streaming (without keep-alives) - # if we're dealing with a tty-enabled container. - if is_tty: - return self._stream_raw_result(res) if stream else \ - self._result(res, binary=True) - - self._raise_for_status(res) - sep = six.binary_type() - if stream: - return self._multiplexed_response_stream_helper(res) - else: - return sep.join( - [x for x in self._multiplexed_buffer_helper(res)] - ) - - def get_adapter(self, url): - try: - return super(ClientBase, self).get_adapter(url) - except requests.exceptions.InvalidSchema as e: - if self._custom_adapter: - return self._custom_adapter - else: - raise e - - @property - def api_version(self): - return self._version diff --git a/tests/test.py b/tests/test.py index 9cf94a18..52a35038 100644 --- a/tests/test.py +++ b/tests/test.py @@ -45,7 +45,7 @@ except ImportError: import mock -DEFAULT_TIMEOUT_SECONDS = docker.client.constants.DEFAULT_TIMEOUT_SECONDS +DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS def response(status_code=200, content='', headers=None, reason=None, elapsed=0, @@ -81,7 +81,7 @@ def fake_resp(url, data=None, **kwargs): fake_request = mock.Mock(side_effect=fake_resp) url_prefix = 'http+docker://localunixsocket/v{0}/'.format( - docker.client.constants.DEFAULT_DOCKER_API_VERSION) + docker.constants.DEFAULT_DOCKER_API_VERSION) class Cleanup(object):