Merge pull request #1632 from kaiyou/master

allow ipv6 :: notation in split_port (using re)
This commit is contained in:
Joffrey F 2017-06-16 11:57:52 -07:00 committed by GitHub
commit 87d5cd1b0a
2 changed files with 46 additions and 67 deletions

View File

@ -1,3 +1,16 @@
import re
PORT_SPEC = re.compile(
"^" # Match full string
"(" # External part
"((?P<host>[a-fA-F\d.:]+):)?" # Address
"(?P<ext>[\d]*)(-(?P<ext_end>[\d]+))?:" # External range
")?"
"(?P<int>[\d]+)(-(?P<int_end>[\d]+))?" # Internal range
"(?P<proto>/(udp|tcp))?" # Protocol
"$" # Match full string
)
def add_port_mapping(port_bindings, internal_port, external):
if internal_port in port_bindings:
@ -24,81 +37,41 @@ def build_port_bindings(ports):
return port_bindings
def to_port_range(port, randomly_available_port=False):
if not port:
return None
protocol = ""
if "/" in port:
parts = port.split("/")
if len(parts) != 2:
_raise_invalid_port(port)
port, protocol = parts
protocol = "/" + protocol
if randomly_available_port:
return ["%s%s" % (port, protocol)]
parts = str(port).split('-')
if len(parts) == 1:
return ["%s%s" % (port, protocol)]
if len(parts) == 2:
full_port_range = range(int(parts[0]), int(parts[1]) + 1)
return ["%s%s" % (p, protocol) for p in full_port_range]
raise ValueError('Invalid port range "%s", should be '
'port or startport-endport' % port)
def _raise_invalid_port(port):
raise ValueError('Invalid port "%s", should be '
'[[remote_ip:]remote_port[-remote_port]:]'
'port[/protocol]' % port)
def port_range(start, end, proto, randomly_available_port=False):
if not start:
return start
if not end:
return [start + proto]
if randomly_available_port:
return ['{}-{}'.format(start, end) + proto]
return [str(port) + proto for port in range(int(start), int(end) + 1)]
def split_port(port):
parts = str(port).split(':')
if not 1 <= len(parts) <= 3:
match = PORT_SPEC.match(port)
if match is None:
_raise_invalid_port(port)
parts = match.groupdict()
if len(parts) == 1:
internal_port, = parts
if not internal_port:
_raise_invalid_port(port)
return to_port_range(internal_port), None
if len(parts) == 2:
external_port, internal_port = parts
host = parts['host']
proto = parts['proto'] or ''
internal = port_range(parts['int'], parts['int_end'], proto)
external = port_range(
parts['ext'], parts['ext_end'], '', len(internal) == 1)
internal_range = to_port_range(internal_port)
if internal_range is None:
_raise_invalid_port(port)
external_range = to_port_range(external_port, len(internal_range) == 1)
if external_range is None:
_raise_invalid_port(port)
if len(internal_range) != len(external_range):
if host is None:
if external is not None and len(internal) != len(external):
raise ValueError('Port ranges don\'t match in length')
return internal_range, external_range
external_ip, external_port, internal_port = parts
if not internal_port:
_raise_invalid_port(port)
internal_range = to_port_range(internal_port)
external_range = to_port_range(external_port, len(internal_range) == 1)
if not external_range:
external_range = [None] * len(internal_range)
if len(internal_range) != len(external_range):
raise ValueError('Port ranges don\'t match in length')
return internal_range, [(external_ip, ex_port or None)
for ex_port in external_range]
return internal, external
else:
if not external:
external = [None] * len(internal)
elif len(internal) != len(external):
raise ValueError('Port ranges don\'t match in length')
return internal, [(host, ext_port) for ext_port in external]

View File

@ -552,6 +552,12 @@ class PortsTest(unittest.TestCase):
self.assertEqual(external_port,
[("127.0.0.1", "1000"), ("127.0.0.1", "1001")])
def test_split_port_with_ipv6_address(self):
internal_port, external_port = split_port(
"2001:abcd:ef00::2:1000:2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, [("2001:abcd:ef00::2", "1000")])
def test_split_port_invalid(self):
self.assertRaises(ValueError,
lambda: split_port("0.0.0.0:1000:2000:tcp"))