mirror of https://github.com/docker/docker-py.git
Experimental npipe:// support
Signed-off-by: Joffrey F <joffrey@docker.com>
This commit is contained in:
parent
88811a2659
commit
a34e0cbfaa
|
@ -27,6 +27,10 @@ from . import constants
|
|||
from . import errors
|
||||
from .auth import auth
|
||||
from .unixconn import unixconn
|
||||
try:
|
||||
from .npipeconn import npipeconn
|
||||
except ImportError:
|
||||
pass
|
||||
from .ssladapter import ssladapter
|
||||
from .utils import utils, check_resource, update_headers, kwargs_from_env
|
||||
from .tls import TLSConfig
|
||||
|
@ -64,6 +68,14 @@ class Client(
|
|||
self._custom_adapter = unixconn.UnixAdapter(base_url, timeout)
|
||||
self.mount('http+docker://', self._custom_adapter)
|
||||
self.base_url = 'http+docker://localunixsocket'
|
||||
elif base_url.startswith('npipe://'):
|
||||
if not constants.IS_WINDOWS_PLATFORM:
|
||||
raise errors.DockerException(
|
||||
'The npipe:// protocol is only supported on Windows'
|
||||
)
|
||||
self._custom_adapter = npipeconn.NpipeAdapter(base_url, timeout)
|
||||
self.mount('http+docker://', self._custom_adapter)
|
||||
self.base_url = 'http+docker://localnpipe'
|
||||
else:
|
||||
# Use SSLAdapter for the ability to specify SSL version
|
||||
if isinstance(tls, TLSConfig):
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import sys
|
||||
|
||||
DEFAULT_DOCKER_API_VERSION = '1.22'
|
||||
DEFAULT_TIMEOUT_SECONDS = 60
|
||||
STREAM_HEADER_SIZE_BYTES = 8
|
||||
|
@ -8,3 +10,5 @@ CONTAINER_LIMITS_KEYS = [
|
|||
INSECURE_REGISTRY_DEPRECATION_WARNING = \
|
||||
'The `insecure_registry` argument to {} ' \
|
||||
'is deprecated and non-functional. Please remove it.'
|
||||
|
||||
IS_WINDOWS_PLATFORM = (sys.platform == 'win32')
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
from .npipeconn import NpipeAdapter # flake8: noqa
|
|
@ -0,0 +1,80 @@
|
|||
import six
|
||||
import requests.adapters
|
||||
|
||||
from .npipesocket import NpipeSocket
|
||||
|
||||
if six.PY3:
|
||||
import http.client as httplib
|
||||
else:
|
||||
import httplib
|
||||
|
||||
try:
|
||||
import requests.packages.urllib3 as urllib3
|
||||
except ImportError:
|
||||
import urllib3
|
||||
|
||||
|
||||
RecentlyUsedContainer = urllib3._collections.RecentlyUsedContainer
|
||||
|
||||
|
||||
class NpipeHTTPConnection(httplib.HTTPConnection, object):
|
||||
def __init__(self, npipe_path, timeout=60):
|
||||
super(NpipeHTTPConnection, self).__init__(
|
||||
'localhost', timeout=timeout
|
||||
)
|
||||
self.npipe_path = npipe_path
|
||||
self.timeout = timeout
|
||||
|
||||
def connect(self):
|
||||
sock = NpipeSocket()
|
||||
sock.settimeout(self.timeout)
|
||||
sock.connect(self.npipe_path)
|
||||
self.sock = sock
|
||||
|
||||
|
||||
class NpipeHTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
|
||||
def __init__(self, npipe_path, timeout=60):
|
||||
super(NpipeHTTPConnectionPool, self).__init__(
|
||||
'localhost', timeout=timeout
|
||||
)
|
||||
self.npipe_path = npipe_path
|
||||
self.timeout = timeout
|
||||
|
||||
def _new_conn(self):
|
||||
return NpipeHTTPConnection(
|
||||
self.npipe_path, self.timeout
|
||||
)
|
||||
|
||||
|
||||
class NpipeAdapter(requests.adapters.HTTPAdapter):
|
||||
def __init__(self, base_url, timeout=60):
|
||||
self.npipe_path = base_url.replace('npipe://', '')
|
||||
self.timeout = timeout
|
||||
self.pools = RecentlyUsedContainer(
|
||||
10, dispose_func=lambda p: p.close()
|
||||
)
|
||||
super(NpipeAdapter, self).__init__()
|
||||
|
||||
def get_connection(self, url, proxies=None):
|
||||
with self.pools.lock:
|
||||
pool = self.pools.get(url)
|
||||
if pool:
|
||||
return pool
|
||||
|
||||
pool = NpipeHTTPConnectionPool(
|
||||
self.npipe_path, self.timeout
|
||||
)
|
||||
self.pools[url] = pool
|
||||
|
||||
return pool
|
||||
|
||||
def request_url(self, request, proxies):
|
||||
# The select_proxy utility in requests errors out when the provided URL
|
||||
# doesn't have a hostname, like is the case when using a UNIX socket.
|
||||
# Since proxies are an irrelevant notion in the case of UNIX sockets
|
||||
# anyway, we simply return the path URL directly.
|
||||
# See also: https://github.com/docker/docker-py/issues/811
|
||||
return request.path_url
|
||||
|
||||
def close(self):
|
||||
self.pools.clear()
|
|
@ -0,0 +1,194 @@
|
|||
import functools
|
||||
import io
|
||||
|
||||
import win32file
|
||||
import win32pipe
|
||||
|
||||
cSECURITY_SQOS_PRESENT = 0x100000
|
||||
cSECURITY_ANONYMOUS = 0
|
||||
cPIPE_READMODE_MESSAGE = 2
|
||||
|
||||
|
||||
def check_closed(f):
|
||||
@functools.wraps(f)
|
||||
def wrapped(self, *args, **kwargs):
|
||||
if self._closed:
|
||||
raise RuntimeError(
|
||||
'Can not reuse socket after connection was closed.'
|
||||
)
|
||||
return f(self, *args, **kwargs)
|
||||
return wrapped
|
||||
|
||||
|
||||
class NpipeSocket(object):
|
||||
""" Partial implementation of the socket API over windows named pipes.
|
||||
This implementation is only designed to be used as a client socket,
|
||||
and server-specific methods (bind, listen, accept...) are not
|
||||
implemented.
|
||||
"""
|
||||
def __init__(self, handle=None):
|
||||
self._timeout = win32pipe.NMPWAIT_USE_DEFAULT_WAIT
|
||||
self._handle = handle
|
||||
self._closed = False
|
||||
|
||||
def accept(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def bind(self, address):
|
||||
raise NotImplementedError()
|
||||
|
||||
def close(self):
|
||||
self._handle.Close()
|
||||
self._closed = True
|
||||
|
||||
@check_closed
|
||||
def connect(self, address):
|
||||
win32pipe.WaitNamedPipe(address, self._timeout)
|
||||
handle = win32file.CreateFile(
|
||||
address,
|
||||
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
|
||||
0,
|
||||
None,
|
||||
win32file.OPEN_EXISTING,
|
||||
cSECURITY_ANONYMOUS | cSECURITY_SQOS_PRESENT,
|
||||
0
|
||||
)
|
||||
self.flags = win32pipe.GetNamedPipeInfo(handle)[0]
|
||||
# self.state = win32pipe.GetNamedPipeHandleState(handle)[0]
|
||||
|
||||
# if self.state & cPIPE_READMODE_MESSAGE != 0:
|
||||
# raise RuntimeError("message readmode pipes not supported")
|
||||
self._handle = handle
|
||||
self._address = address
|
||||
|
||||
@check_closed
|
||||
def connect_ex(self, address):
|
||||
return self.connect(address)
|
||||
|
||||
@check_closed
|
||||
def detach(self):
|
||||
self._closed = True
|
||||
return self._handle
|
||||
|
||||
@check_closed
|
||||
def dup(self):
|
||||
return NpipeSocket(self._handle)
|
||||
|
||||
@check_closed
|
||||
def fileno(self):
|
||||
return int(self._handle)
|
||||
|
||||
def getpeername(self):
|
||||
return self._address
|
||||
|
||||
def getsockname(self):
|
||||
return self._address
|
||||
|
||||
def getsockopt(self, level, optname, buflen=None):
|
||||
raise NotImplementedError()
|
||||
|
||||
def ioctl(self, control, option):
|
||||
raise NotImplementedError()
|
||||
|
||||
def listen(self, backlog):
|
||||
raise NotImplementedError()
|
||||
|
||||
def makefile(self, mode=None, bufsize=None):
|
||||
if mode.strip('b') != 'r':
|
||||
raise NotImplementedError()
|
||||
rawio = NpipeFileIOBase(self)
|
||||
if bufsize is None:
|
||||
bufsize = io.DEFAULT_BUFFER_SIZE
|
||||
return io.BufferedReader(rawio, buffer_size=bufsize)
|
||||
|
||||
@check_closed
|
||||
def recv(self, bufsize, flags=0):
|
||||
err, data = win32file.ReadFile(self._handle, bufsize)
|
||||
return data
|
||||
|
||||
@check_closed
|
||||
def recvfrom(self, bufsize, flags=0):
|
||||
data = self.recv(bufsize, flags)
|
||||
return (data, self._address)
|
||||
|
||||
@check_closed
|
||||
def recvfrom_into(self, buf, nbytes=0, flags=0):
|
||||
return self.recv_into(buf, nbytes, flags), self._address
|
||||
|
||||
@check_closed
|
||||
def recv_into(self, buf, nbytes=0):
|
||||
readbuf = buf
|
||||
if not isinstance(buf, memoryview):
|
||||
readbuf = memoryview(buf)
|
||||
|
||||
err, data = win32file.ReadFile(
|
||||
self._handle,
|
||||
readbuf[:nbytes] if nbytes else readbuf
|
||||
)
|
||||
return len(data)
|
||||
|
||||
@check_closed
|
||||
def send(self, string, flags=0):
|
||||
err, nbytes = win32file.WriteFile(self._handle, string)
|
||||
return nbytes
|
||||
|
||||
@check_closed
|
||||
def sendall(self, string, flags=0):
|
||||
return self.send(string, flags)
|
||||
|
||||
@check_closed
|
||||
def sendto(self, string, address):
|
||||
self.connect(address)
|
||||
return self.send(string)
|
||||
|
||||
def setblocking(self, flag):
|
||||
if flag:
|
||||
return self.settimeout(None)
|
||||
return self.settimeout(0)
|
||||
|
||||
def settimeout(self, value):
|
||||
if value is None:
|
||||
self._timeout = win32pipe.NMPWAIT_NOWAIT
|
||||
elif not isinstance(value, (float, int)) or value < 0:
|
||||
raise ValueError('Timeout value out of range')
|
||||
elif value == 0:
|
||||
self._timeout = win32pipe.NMPWAIT_USE_DEFAULT_WAIT
|
||||
else:
|
||||
self._timeout = value
|
||||
|
||||
def gettimeout(self):
|
||||
return self._timeout
|
||||
|
||||
def setsockopt(self, level, optname, value):
|
||||
raise NotImplementedError()
|
||||
|
||||
@check_closed
|
||||
def shutdown(self, how):
|
||||
return self.close()
|
||||
|
||||
|
||||
class NpipeFileIOBase(io.RawIOBase):
|
||||
def __init__(self, npipe_socket):
|
||||
self.sock = npipe_socket
|
||||
|
||||
def close(self):
|
||||
super(NpipeFileIOBase, self).close()
|
||||
self.sock = None
|
||||
|
||||
def fileno(self):
|
||||
return self.sock.fileno()
|
||||
|
||||
def isatty(self):
|
||||
return False
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
|
||||
def readinto(self, buf):
|
||||
return self.sock.recv_into(buf)
|
||||
|
||||
def seekable(self):
|
||||
return False
|
||||
|
||||
def writable(self):
|
||||
return False
|
|
@ -0,0 +1,37 @@
|
|||
import win32pipe
|
||||
import win32file
|
||||
|
||||
import random
|
||||
|
||||
def pipe_name():
|
||||
return 'testpipe{}'.format(random.randint(0, 4096))
|
||||
|
||||
def create_pipe(name):
|
||||
handle = win32pipe.CreateNamedPipe(
|
||||
'//./pipe/{}'.format(name),
|
||||
win32pipe.PIPE_ACCESS_DUPLEX,
|
||||
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT,
|
||||
128, 4096, 4096, 300, None
|
||||
)
|
||||
return handle
|
||||
|
||||
def rw_loop(pipe):
|
||||
err = win32pipe.ConnectNamedPipe(pipe, None)
|
||||
if err != 0:
|
||||
raise RuntimeError('Error code: {}'.format(err))
|
||||
while True:
|
||||
err, data = win32file.ReadFile(pipe, 4096, None)
|
||||
if err != 0:
|
||||
raise RuntimeError('Error code: {}'.format(err))
|
||||
print('Data received: ', data, len(data))
|
||||
win32file.WriteFile(pipe, b'ACK', None)
|
||||
|
||||
|
||||
def __main__():
|
||||
name = pipe_name()
|
||||
print('Initializing pipe {}'.format(name))
|
||||
pipe = create_pipe(name)
|
||||
print('Pipe created, entering server loop.')
|
||||
rw_loop(pipe)
|
||||
|
||||
__main__()
|
|
@ -413,6 +413,9 @@ def parse_host(addr, platform=None, tls=False):
|
|||
elif addr.startswith('https://'):
|
||||
proto = "https"
|
||||
addr = addr[8:]
|
||||
elif addr.startswith('npipe://'):
|
||||
proto = 'npipe'
|
||||
addr = addr[8:]
|
||||
elif addr.startswith('fd://'):
|
||||
raise errors.DockerException("fd protocol is not implemented")
|
||||
else:
|
||||
|
@ -448,7 +451,7 @@ def parse_host(addr, platform=None, tls=False):
|
|||
else:
|
||||
host = addr
|
||||
|
||||
if proto == "http+unix":
|
||||
if proto == "http+unix" or proto == 'npipe':
|
||||
return "{0}://{1}".format(proto, host)
|
||||
return "{0}://{1}:{2}{3}".format(proto, host, port, path)
|
||||
|
||||
|
|
Loading…
Reference in New Issue