mirror of https://github.com/docker/docker-py.git
Merge pull request #652 from docker/clientbase_extraction
ClientBase extraction
This commit is contained in:
commit
f3453aa23f
232
docker/client.py
232
docker/client.py
|
@ -12,246 +12,22 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import struct
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
import requests.exceptions
|
||||
import six
|
||||
import websocket
|
||||
|
||||
|
||||
from . import clientbase
|
||||
from . import constants
|
||||
from . import errors
|
||||
from .auth import auth
|
||||
from .unixconn import unixconn
|
||||
from .ssladapter import ssladapter
|
||||
from .utils import utils, check_resource
|
||||
from .tls import TLSConfig
|
||||
|
||||
|
||||
class Client(requests.Session):
|
||||
def __init__(self, base_url=None, version=None,
|
||||
timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False):
|
||||
super(Client, self).__init__()
|
||||
|
||||
if tls and not base_url.startswith('https://'):
|
||||
raise errors.TLSParameterError(
|
||||
'If using TLS, the base_url argument must begin with '
|
||||
'"https://".')
|
||||
|
||||
self.base_url = base_url
|
||||
self.timeout = timeout
|
||||
|
||||
self._auth_configs = auth.load_config()
|
||||
|
||||
base_url = utils.parse_host(base_url)
|
||||
if base_url.startswith('http+unix://'):
|
||||
self._adapter = unixconn.UnixAdapter(base_url, timeout)
|
||||
self.mount('http+docker://', self._adapter)
|
||||
self.base_url = 'http+docker://localunixsocket'
|
||||
else:
|
||||
# Use SSLAdapter for the ability to specify SSL version
|
||||
if isinstance(tls, TLSConfig):
|
||||
tls.configure_client(self)
|
||||
elif tls:
|
||||
self._adapter = ssladapter.SSLAdapter()
|
||||
self.mount('https://', self._adapter)
|
||||
self.base_url = base_url
|
||||
|
||||
# version detection needs to be after unix adapter mounting
|
||||
if version is None:
|
||||
self._version = constants.DEFAULT_DOCKER_API_VERSION
|
||||
elif isinstance(version, six.string_types):
|
||||
if version.lower() == 'auto':
|
||||
self._version = self._retrieve_server_version()
|
||||
else:
|
||||
self._version = version
|
||||
else:
|
||||
raise errors.DockerException(
|
||||
'Version parameter must be a string or None. Found {0}'.format(
|
||||
type(version).__name__
|
||||
)
|
||||
)
|
||||
|
||||
def _retrieve_server_version(self):
|
||||
try:
|
||||
return self.version(api_version=False)["ApiVersion"]
|
||||
except KeyError:
|
||||
raise errors.DockerException(
|
||||
'Invalid response from docker daemon: key "ApiVersion"'
|
||||
' is missing.'
|
||||
)
|
||||
except Exception as e:
|
||||
raise errors.DockerException(
|
||||
'Error while fetching server API version: {0}'.format(e)
|
||||
)
|
||||
|
||||
def _set_request_timeout(self, kwargs):
|
||||
"""Prepare the kwargs for an HTTP request by inserting the timeout
|
||||
parameter, if not already present."""
|
||||
kwargs.setdefault('timeout', self.timeout)
|
||||
return kwargs
|
||||
|
||||
def _post(self, url, **kwargs):
|
||||
return self.post(url, **self._set_request_timeout(kwargs))
|
||||
|
||||
def _get(self, url, **kwargs):
|
||||
return self.get(url, **self._set_request_timeout(kwargs))
|
||||
|
||||
def _delete(self, url, **kwargs):
|
||||
return self.delete(url, **self._set_request_timeout(kwargs))
|
||||
|
||||
def _url(self, path, versioned_api=True):
|
||||
if versioned_api:
|
||||
return '{0}/v{1}{2}'.format(self.base_url, self._version, path)
|
||||
else:
|
||||
return '{0}{1}'.format(self.base_url, path)
|
||||
|
||||
def _raise_for_status(self, response, explanation=None):
|
||||
"""Raises stored :class:`APIError`, if one occurred."""
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
raise errors.APIError(e, response, explanation=explanation)
|
||||
|
||||
def _result(self, response, json=False, binary=False):
|
||||
assert not (json and binary)
|
||||
self._raise_for_status(response)
|
||||
|
||||
if json:
|
||||
return response.json()
|
||||
if binary:
|
||||
return response.content
|
||||
return response.text
|
||||
|
||||
def _post_json(self, url, data, **kwargs):
|
||||
# Go <1.1 can't unserialize null to a string
|
||||
# so we do this disgusting thing here.
|
||||
data2 = {}
|
||||
if data is not None:
|
||||
for k, v in six.iteritems(data):
|
||||
if v is not None:
|
||||
data2[k] = v
|
||||
|
||||
if 'headers' not in kwargs:
|
||||
kwargs['headers'] = {}
|
||||
kwargs['headers']['Content-Type'] = 'application/json'
|
||||
return self._post(url, data=json.dumps(data2), **kwargs)
|
||||
|
||||
def _attach_params(self, override=None):
|
||||
return override or {
|
||||
'stdout': 1,
|
||||
'stderr': 1,
|
||||
'stream': 1
|
||||
}
|
||||
|
||||
@check_resource
|
||||
def _attach_websocket(self, container, params=None):
|
||||
url = self._url("/containers/{0}/attach/ws".format(container))
|
||||
req = requests.Request("POST", url, params=self._attach_params(params))
|
||||
full_url = req.prepare().url
|
||||
full_url = full_url.replace("http://", "ws://", 1)
|
||||
full_url = full_url.replace("https://", "wss://", 1)
|
||||
return self._create_websocket_connection(full_url)
|
||||
|
||||
def _create_websocket_connection(self, url):
|
||||
return websocket.create_connection(url)
|
||||
|
||||
def _get_raw_response_socket(self, response):
|
||||
self._raise_for_status(response)
|
||||
if six.PY3:
|
||||
sock = response.raw._fp.fp.raw
|
||||
else:
|
||||
sock = response.raw._fp.fp._sock
|
||||
try:
|
||||
# Keep a reference to the response to stop it being garbage
|
||||
# collected. If the response is garbage collected, it will
|
||||
# close TLS sockets.
|
||||
sock._response = response
|
||||
except AttributeError:
|
||||
# UNIX sockets can't have attributes set on them, but that's
|
||||
# fine because we won't be doing TLS over them
|
||||
pass
|
||||
|
||||
return sock
|
||||
|
||||
def _stream_helper(self, response, decode=False):
|
||||
"""Generator for data coming from a chunked-encoded HTTP response."""
|
||||
if response.raw._fp.chunked:
|
||||
reader = response.raw
|
||||
while not reader.closed:
|
||||
# this read call will block until we get a chunk
|
||||
data = reader.read(1)
|
||||
if not data:
|
||||
break
|
||||
if reader._fp.chunk_left:
|
||||
data += reader.read(reader._fp.chunk_left)
|
||||
if decode:
|
||||
if six.PY3:
|
||||
data = data.decode('utf-8')
|
||||
data = json.loads(data)
|
||||
yield data
|
||||
else:
|
||||
# Response isn't chunked, meaning we probably
|
||||
# encountered an error immediately
|
||||
yield self._result(response)
|
||||
|
||||
def _multiplexed_buffer_helper(self, response):
|
||||
"""A generator of multiplexed data blocks read from a buffered
|
||||
response."""
|
||||
buf = self._result(response, binary=True)
|
||||
walker = 0
|
||||
while True:
|
||||
if len(buf[walker:]) < 8:
|
||||
break
|
||||
_, length = struct.unpack_from('>BxxxL', buf[walker:])
|
||||
start = walker + constants.STREAM_HEADER_SIZE_BYTES
|
||||
end = start + length
|
||||
walker = end
|
||||
yield buf[start:end]
|
||||
|
||||
def _multiplexed_response_stream_helper(self, response):
|
||||
"""A generator of multiplexed data blocks coming from a response
|
||||
stream."""
|
||||
|
||||
# Disable timeout on the underlying socket to prevent
|
||||
# Read timed out(s) for long running processes
|
||||
socket = self._get_raw_response_socket(response)
|
||||
if six.PY3:
|
||||
socket._sock.settimeout(None)
|
||||
else:
|
||||
socket.settimeout(None)
|
||||
|
||||
while True:
|
||||
header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES)
|
||||
if not header:
|
||||
break
|
||||
_, length = struct.unpack('>BxxxL', header)
|
||||
if not length:
|
||||
break
|
||||
data = response.raw.read(length)
|
||||
if not data:
|
||||
break
|
||||
yield data
|
||||
|
||||
@property
|
||||
def api_version(self):
|
||||
return self._version
|
||||
|
||||
def get_adapter(self, url):
|
||||
try:
|
||||
return super(Client, self).get_adapter(url)
|
||||
except requests.exceptions.InvalidSchema as e:
|
||||
if self._adapter:
|
||||
return self._adapter
|
||||
raise e
|
||||
|
||||
class Client(clientbase.ClientBase):
|
||||
@check_resource
|
||||
def attach(self, container, stdout=True, stderr=True,
|
||||
stream=False, logs=False):
|
||||
|
@ -745,7 +521,9 @@ class Client(requests.Session):
|
|||
@check_resource
|
||||
def inspect_image(self, image):
|
||||
return self._result(
|
||||
self._get(self._url("/images/{0}/json".format(image))),
|
||||
self._get(
|
||||
self._url("/images/{0}/json".format(image.replace('/', '%2F')))
|
||||
),
|
||||
True
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,235 @@
|
|||
import json
|
||||
import struct
|
||||
|
||||
import requests
|
||||
import requests.exceptions
|
||||
import six
|
||||
import websocket
|
||||
|
||||
|
||||
from . import constants
|
||||
from . import errors
|
||||
from .auth import auth
|
||||
from .unixconn import unixconn
|
||||
from .ssladapter import ssladapter
|
||||
from .utils import utils, check_resource
|
||||
from .tls import TLSConfig
|
||||
|
||||
|
||||
class ClientBase(requests.Session):
|
||||
def __init__(self, base_url=None, version=None,
|
||||
timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False):
|
||||
super(ClientBase, self).__init__()
|
||||
|
||||
if tls and not base_url.startswith('https://'):
|
||||
raise errors.TLSParameterError(
|
||||
'If using TLS, the base_url argument must begin with '
|
||||
'"https://".')
|
||||
|
||||
self.base_url = base_url
|
||||
self.timeout = timeout
|
||||
|
||||
self._auth_configs = auth.load_config()
|
||||
|
||||
base_url = utils.parse_host(base_url)
|
||||
if base_url.startswith('http+unix://'):
|
||||
self._custom_adapter = unixconn.UnixAdapter(base_url, timeout)
|
||||
self.mount('http+docker://', self._custom_adapter)
|
||||
self.base_url = 'http+docker://localunixsocket'
|
||||
else:
|
||||
# Use SSLAdapter for the ability to specify SSL version
|
||||
if isinstance(tls, TLSConfig):
|
||||
tls.configure_client(self)
|
||||
elif tls:
|
||||
self._custom_adapter = ssladapter.SSLAdapter()
|
||||
self.mount('https://', self._custom_adapter)
|
||||
self.base_url = base_url
|
||||
|
||||
# version detection needs to be after unix adapter mounting
|
||||
if version is None:
|
||||
self._version = constants.DEFAULT_DOCKER_API_VERSION
|
||||
elif isinstance(version, six.string_types):
|
||||
if version.lower() == 'auto':
|
||||
self._version = self._retrieve_server_version()
|
||||
else:
|
||||
self._version = version
|
||||
else:
|
||||
raise errors.DockerException(
|
||||
'Version parameter must be a string or None. Found {0}'.format(
|
||||
type(version).__name__
|
||||
)
|
||||
)
|
||||
|
||||
def _retrieve_server_version(self):
|
||||
try:
|
||||
return self.version(api_version=False)["ApiVersion"]
|
||||
except KeyError:
|
||||
raise errors.DockerException(
|
||||
'Invalid response from docker daemon: key "ApiVersion"'
|
||||
' is missing.'
|
||||
)
|
||||
except Exception as e:
|
||||
raise errors.DockerException(
|
||||
'Error while fetching server API version: {0}'.format(e)
|
||||
)
|
||||
|
||||
def _set_request_timeout(self, kwargs):
|
||||
"""Prepare the kwargs for an HTTP request by inserting the timeout
|
||||
parameter, if not already present."""
|
||||
kwargs.setdefault('timeout', self.timeout)
|
||||
return kwargs
|
||||
|
||||
def _post(self, url, **kwargs):
|
||||
return self.post(url, **self._set_request_timeout(kwargs))
|
||||
|
||||
def _get(self, url, **kwargs):
|
||||
return self.get(url, **self._set_request_timeout(kwargs))
|
||||
|
||||
def _delete(self, url, **kwargs):
|
||||
return self.delete(url, **self._set_request_timeout(kwargs))
|
||||
|
||||
def _url(self, path, versioned_api=True):
|
||||
if versioned_api:
|
||||
return '{0}/v{1}{2}'.format(self.base_url, self._version, path)
|
||||
else:
|
||||
return '{0}{1}'.format(self.base_url, path)
|
||||
|
||||
def _raise_for_status(self, response, explanation=None):
|
||||
"""Raises stored :class:`APIError`, if one occurred."""
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
raise errors.APIError(e, response, explanation=explanation)
|
||||
|
||||
def _result(self, response, json=False, binary=False):
|
||||
assert not (json and binary)
|
||||
self._raise_for_status(response)
|
||||
|
||||
if json:
|
||||
return response.json()
|
||||
if binary:
|
||||
return response.content
|
||||
return response.text
|
||||
|
||||
def _post_json(self, url, data, **kwargs):
|
||||
# Go <1.1 can't unserialize null to a string
|
||||
# so we do this disgusting thing here.
|
||||
data2 = {}
|
||||
if data is not None:
|
||||
for k, v in six.iteritems(data):
|
||||
if v is not None:
|
||||
data2[k] = v
|
||||
|
||||
if 'headers' not in kwargs:
|
||||
kwargs['headers'] = {}
|
||||
kwargs['headers']['Content-Type'] = 'application/json'
|
||||
return self._post(url, data=json.dumps(data2), **kwargs)
|
||||
|
||||
def _attach_params(self, override=None):
|
||||
return override or {
|
||||
'stdout': 1,
|
||||
'stderr': 1,
|
||||
'stream': 1
|
||||
}
|
||||
|
||||
@check_resource
|
||||
def _attach_websocket(self, container, params=None):
|
||||
url = self._url("/containers/{0}/attach/ws".format(container))
|
||||
req = requests.Request("POST", url, params=self._attach_params(params))
|
||||
full_url = req.prepare().url
|
||||
full_url = full_url.replace("http://", "ws://", 1)
|
||||
full_url = full_url.replace("https://", "wss://", 1)
|
||||
return self._create_websocket_connection(full_url)
|
||||
|
||||
def _create_websocket_connection(self, url):
|
||||
return websocket.create_connection(url)
|
||||
|
||||
def _get_raw_response_socket(self, response):
|
||||
self._raise_for_status(response)
|
||||
if six.PY3:
|
||||
sock = response.raw._fp.fp.raw
|
||||
else:
|
||||
sock = response.raw._fp.fp._sock
|
||||
try:
|
||||
# Keep a reference to the response to stop it being garbage
|
||||
# collected. If the response is garbage collected, it will
|
||||
# close TLS sockets.
|
||||
sock._response = response
|
||||
except AttributeError:
|
||||
# UNIX sockets can't have attributes set on them, but that's
|
||||
# fine because we won't be doing TLS over them
|
||||
pass
|
||||
|
||||
return sock
|
||||
|
||||
def _stream_helper(self, response, decode=False):
|
||||
"""Generator for data coming from a chunked-encoded HTTP response."""
|
||||
if response.raw._fp.chunked:
|
||||
reader = response.raw
|
||||
while not reader.closed:
|
||||
# this read call will block until we get a chunk
|
||||
data = reader.read(1)
|
||||
if not data:
|
||||
break
|
||||
if reader._fp.chunk_left:
|
||||
data += reader.read(reader._fp.chunk_left)
|
||||
if decode:
|
||||
if six.PY3:
|
||||
data = data.decode('utf-8')
|
||||
data = json.loads(data)
|
||||
yield data
|
||||
else:
|
||||
# Response isn't chunked, meaning we probably
|
||||
# encountered an error immediately
|
||||
yield self._result(response)
|
||||
|
||||
def _multiplexed_buffer_helper(self, response):
|
||||
"""A generator of multiplexed data blocks read from a buffered
|
||||
response."""
|
||||
buf = self._result(response, binary=True)
|
||||
walker = 0
|
||||
while True:
|
||||
if len(buf[walker:]) < 8:
|
||||
break
|
||||
_, length = struct.unpack_from('>BxxxL', buf[walker:])
|
||||
start = walker + constants.STREAM_HEADER_SIZE_BYTES
|
||||
end = start + length
|
||||
walker = end
|
||||
yield buf[start:end]
|
||||
|
||||
def _multiplexed_response_stream_helper(self, response):
|
||||
"""A generator of multiplexed data blocks coming from a response
|
||||
stream."""
|
||||
|
||||
# Disable timeout on the underlying socket to prevent
|
||||
# Read timed out(s) for long running processes
|
||||
socket = self._get_raw_response_socket(response)
|
||||
if six.PY3:
|
||||
socket._sock.settimeout(None)
|
||||
else:
|
||||
socket.settimeout(None)
|
||||
|
||||
while True:
|
||||
header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES)
|
||||
if not header:
|
||||
break
|
||||
_, length = struct.unpack('>BxxxL', header)
|
||||
if not length:
|
||||
break
|
||||
data = response.raw.read(length)
|
||||
if not data:
|
||||
break
|
||||
yield data
|
||||
|
||||
def get_adapter(self, url):
|
||||
try:
|
||||
return super(ClientBase, self).get_adapter(url)
|
||||
except requests.exceptions.InvalidSchema as e:
|
||||
if self._custom_adapter:
|
||||
return self._custom_adapter
|
||||
else:
|
||||
raise e
|
||||
|
||||
@property
|
||||
def api_version(self):
|
||||
return self._version
|
Loading…
Reference in New Issue