docker-py/tests/unit/utils_test.py

965 lines
33 KiB
Python

# -*- coding: utf-8 -*-
import base64
import json
import os
import os.path
import shutil
import sys
import tarfile
import tempfile
import unittest
import pytest
import six
from docker.api.client import APIClient
from docker.constants import IS_WINDOWS_PLATFORM
from docker.errors import DockerException
from docker.utils import (
parse_repository_tag, parse_host, convert_filters, kwargs_from_env,
parse_bytes, parse_env_file, exclude_paths, convert_volume_binds,
decode_json_header, tar, split_command, parse_devices, update_headers,
)
from docker.utils.ports import build_port_bindings, split_port
from docker.utils.utils import (
format_environment, should_check_directory
)
from ..helpers import make_tree
TEST_CERT_DIR = os.path.join(
os.path.dirname(__file__),
'testdata/certs',
)
class DecoratorsTest(unittest.TestCase):
def test_update_headers(self):
sample_headers = {
'X-Docker-Locale': 'en-US',
}
def f(self, headers=None):
return headers
client = APIClient()
client._auth_configs = {}
g = update_headers(f)
assert g(client, headers=None) is None
assert g(client, headers={}) == {}
assert g(client, headers={'Content-type': 'application/json'}) == {
'Content-type': 'application/json',
}
client._auth_configs = {
'HttpHeaders': sample_headers
}
assert g(client, headers=None) == sample_headers
assert g(client, headers={}) == sample_headers
assert g(client, headers={'Content-type': 'application/json'}) == {
'Content-type': 'application/json',
'X-Docker-Locale': 'en-US',
}
class KwargsFromEnvTest(unittest.TestCase):
def setUp(self):
self.os_environ = os.environ.copy()
def tearDown(self):
os.environ = self.os_environ
def test_kwargs_from_env_empty(self):
os.environ.update(DOCKER_HOST='',
DOCKER_CERT_PATH='')
os.environ.pop('DOCKER_TLS_VERIFY', None)
kwargs = kwargs_from_env()
self.assertEqual(None, kwargs.get('base_url'))
self.assertEqual(None, kwargs.get('tls'))
def test_kwargs_from_env_tls(self):
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='1')
kwargs = kwargs_from_env(assert_hostname=False)
self.assertEqual('https://192.168.59.103:2376', kwargs['base_url'])
self.assertTrue('ca.pem' in kwargs['tls'].ca_cert)
self.assertTrue('cert.pem' in kwargs['tls'].cert[0])
self.assertTrue('key.pem' in kwargs['tls'].cert[1])
self.assertEqual(False, kwargs['tls'].assert_hostname)
self.assertTrue(kwargs['tls'].verify)
try:
client = APIClient(**kwargs)
self.assertEqual(kwargs['base_url'], client.base_url)
self.assertEqual(kwargs['tls'].ca_cert, client.verify)
self.assertEqual(kwargs['tls'].cert, client.cert)
except TypeError as e:
self.fail(e)
def test_kwargs_from_env_tls_verify_false(self):
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY='')
kwargs = kwargs_from_env(assert_hostname=True)
self.assertEqual('https://192.168.59.103:2376', kwargs['base_url'])
self.assertTrue('ca.pem' in kwargs['tls'].ca_cert)
self.assertTrue('cert.pem' in kwargs['tls'].cert[0])
self.assertTrue('key.pem' in kwargs['tls'].cert[1])
self.assertEqual(True, kwargs['tls'].assert_hostname)
self.assertEqual(False, kwargs['tls'].verify)
try:
client = APIClient(**kwargs)
self.assertEqual(kwargs['base_url'], client.base_url)
self.assertEqual(kwargs['tls'].cert, client.cert)
self.assertFalse(kwargs['tls'].verify)
except TypeError as e:
self.fail(e)
def test_kwargs_from_env_tls_verify_false_no_cert(self):
temp_dir = tempfile.mkdtemp()
cert_dir = os.path.join(temp_dir, '.docker')
shutil.copytree(TEST_CERT_DIR, cert_dir)
os.environ.update(DOCKER_HOST='tcp://192.168.59.103:2376',
HOME=temp_dir,
DOCKER_TLS_VERIFY='')
os.environ.pop('DOCKER_CERT_PATH', None)
kwargs = kwargs_from_env(assert_hostname=True)
self.assertEqual('tcp://192.168.59.103:2376', kwargs['base_url'])
def test_kwargs_from_env_no_cert_path(self):
try:
temp_dir = tempfile.mkdtemp()
cert_dir = os.path.join(temp_dir, '.docker')
shutil.copytree(TEST_CERT_DIR, cert_dir)
os.environ.update(HOME=temp_dir,
DOCKER_CERT_PATH='',
DOCKER_TLS_VERIFY='1')
kwargs = kwargs_from_env()
self.assertTrue(kwargs['tls'].verify)
self.assertIn(cert_dir, kwargs['tls'].ca_cert)
self.assertIn(cert_dir, kwargs['tls'].cert[0])
self.assertIn(cert_dir, kwargs['tls'].cert[1])
finally:
if temp_dir:
shutil.rmtree(temp_dir)
def test_kwargs_from_env_alternate_env(self):
# Values in os.environ are entirely ignored if an alternate is
# provided
os.environ.update(
DOCKER_HOST='tcp://192.168.59.103:2376',
DOCKER_CERT_PATH=TEST_CERT_DIR,
DOCKER_TLS_VERIFY=''
)
kwargs = kwargs_from_env(environment={
'DOCKER_HOST': 'http://docker.gensokyo.jp:2581',
})
assert 'http://docker.gensokyo.jp:2581' == kwargs['base_url']
assert 'tls' not in kwargs
class ConverVolumeBindsTest(unittest.TestCase):
def test_convert_volume_binds_empty(self):
self.assertEqual(convert_volume_binds({}), [])
self.assertEqual(convert_volume_binds([]), [])
def test_convert_volume_binds_list(self):
data = ['/a:/a:ro', '/b:/c:z']
self.assertEqual(convert_volume_binds(data), data)
def test_convert_volume_binds_complete(self):
data = {
'/mnt/vol1': {
'bind': '/data',
'mode': 'ro'
}
}
self.assertEqual(convert_volume_binds(data), ['/mnt/vol1:/data:ro'])
def test_convert_volume_binds_compact(self):
data = {
'/mnt/vol1': '/data'
}
self.assertEqual(convert_volume_binds(data), ['/mnt/vol1:/data:rw'])
def test_convert_volume_binds_no_mode(self):
data = {
'/mnt/vol1': {
'bind': '/data'
}
}
self.assertEqual(convert_volume_binds(data), ['/mnt/vol1:/data:rw'])
def test_convert_volume_binds_unicode_bytes_input(self):
expected = [u'/mnt/지연:/unicode/박:rw']
data = {
u'/mnt/지연'.encode('utf-8'): {
'bind': u'/unicode/박'.encode('utf-8'),
'mode': 'rw'
}
}
self.assertEqual(
convert_volume_binds(data), expected
)
def test_convert_volume_binds_unicode_unicode_input(self):
expected = [u'/mnt/지연:/unicode/박:rw']
data = {
u'/mnt/지연': {
'bind': u'/unicode/박',
'mode': 'rw'
}
}
self.assertEqual(
convert_volume_binds(data), expected
)
class ParseEnvFileTest(unittest.TestCase):
def generate_tempfile(self, file_content=None):
"""
Generates a temporary file for tests with the content
of 'file_content' and returns the filename.
Don't forget to unlink the file with os.unlink() after.
"""
local_tempfile = tempfile.NamedTemporaryFile(delete=False)
local_tempfile.write(file_content.encode('UTF-8'))
local_tempfile.close()
return local_tempfile.name
def test_parse_env_file_proper(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\nPASS=secret')
get_parse_env_file = parse_env_file(env_file)
self.assertEqual(get_parse_env_file,
{'USER': 'jdoe', 'PASS': 'secret'})
os.unlink(env_file)
def test_parse_env_file_with_equals_character(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\nPASS=sec==ret')
get_parse_env_file = parse_env_file(env_file)
self.assertEqual(get_parse_env_file,
{'USER': 'jdoe', 'PASS': 'sec==ret'})
os.unlink(env_file)
def test_parse_env_file_commented_line(self):
env_file = self.generate_tempfile(
file_content='USER=jdoe\n#PASS=secret')
get_parse_env_file = parse_env_file(env_file)
self.assertEqual(get_parse_env_file, {'USER': 'jdoe'})
os.unlink(env_file)
def test_parse_env_file_newline(self):
env_file = self.generate_tempfile(
file_content='\nUSER=jdoe\n\n\nPASS=secret')
get_parse_env_file = parse_env_file(env_file)
self.assertEqual(get_parse_env_file,
{'USER': 'jdoe', 'PASS': 'secret'})
os.unlink(env_file)
def test_parse_env_file_invalid_line(self):
env_file = self.generate_tempfile(
file_content='USER jdoe')
self.assertRaises(
DockerException, parse_env_file, env_file)
os.unlink(env_file)
class ParseHostTest(unittest.TestCase):
def test_parse_host(self):
invalid_hosts = [
'0.0.0.0',
'tcp://',
'udp://127.0.0.1',
'udp://127.0.0.1:2375',
]
valid_hosts = {
'0.0.0.1:5555': 'http://0.0.0.1:5555',
':6666': 'http://127.0.0.1:6666',
'tcp://:7777': 'http://127.0.0.1:7777',
'http://:7777': 'http://127.0.0.1:7777',
'https://kokia.jp:2375': 'https://kokia.jp:2375',
'unix:///var/run/docker.sock': 'http+unix:///var/run/docker.sock',
'unix://': 'http+unix://var/run/docker.sock',
'12.234.45.127:2375/docker/engine': (
'http://12.234.45.127:2375/docker/engine'
),
'somehost.net:80/service/swarm': (
'http://somehost.net:80/service/swarm'
),
'npipe:////./pipe/docker_engine': 'npipe:////./pipe/docker_engine',
'[fd12::82d1]:2375': 'http://[fd12::82d1]:2375',
'https://[fd12:5672::12aa]:1090': 'https://[fd12:5672::12aa]:1090',
'[fd12::82d1]:2375/docker/engine': (
'http://[fd12::82d1]:2375/docker/engine'
),
}
for host in invalid_hosts:
with pytest.raises(DockerException):
parse_host(host, None)
for host, expected in valid_hosts.items():
assert parse_host(host, None) == expected
def test_parse_host_empty_value(self):
unix_socket = 'http+unix://var/run/docker.sock'
npipe = 'npipe:////./pipe/docker_engine'
for val in [None, '']:
assert parse_host(val, is_win32=False) == unix_socket
assert parse_host(val, is_win32=True) == npipe
def test_parse_host_tls(self):
host_value = 'myhost.docker.net:3348'
expected_result = 'https://myhost.docker.net:3348'
assert parse_host(host_value, tls=True) == expected_result
def test_parse_host_tls_tcp_proto(self):
host_value = 'tcp://myhost.docker.net:3348'
expected_result = 'https://myhost.docker.net:3348'
assert parse_host(host_value, tls=True) == expected_result
def test_parse_host_trailing_slash(self):
host_value = 'tcp://myhost.docker.net:2376/'
expected_result = 'http://myhost.docker.net:2376'
assert parse_host(host_value) == expected_result
class ParseRepositoryTagTest(unittest.TestCase):
sha = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
def test_index_image_no_tag(self):
self.assertEqual(
parse_repository_tag("root"), ("root", None)
)
def test_index_image_tag(self):
self.assertEqual(
parse_repository_tag("root:tag"), ("root", "tag")
)
def test_index_user_image_no_tag(self):
self.assertEqual(
parse_repository_tag("user/repo"), ("user/repo", None)
)
def test_index_user_image_tag(self):
self.assertEqual(
parse_repository_tag("user/repo:tag"), ("user/repo", "tag")
)
def test_private_reg_image_no_tag(self):
self.assertEqual(
parse_repository_tag("url:5000/repo"), ("url:5000/repo", None)
)
def test_private_reg_image_tag(self):
self.assertEqual(
parse_repository_tag("url:5000/repo:tag"), ("url:5000/repo", "tag")
)
def test_index_image_sha(self):
self.assertEqual(
parse_repository_tag("root@sha256:{0}".format(self.sha)),
("root", "sha256:{0}".format(self.sha))
)
def test_private_reg_image_sha(self):
self.assertEqual(
parse_repository_tag("url:5000/repo@sha256:{0}".format(self.sha)),
("url:5000/repo", "sha256:{0}".format(self.sha))
)
class ParseDeviceTest(unittest.TestCase):
def test_dict(self):
devices = parse_devices([{
'PathOnHost': '/dev/sda1',
'PathInContainer': '/dev/mnt1',
'CgroupPermissions': 'r'
}])
self.assertEqual(devices[0], {
'PathOnHost': '/dev/sda1',
'PathInContainer': '/dev/mnt1',
'CgroupPermissions': 'r'
})
def test_partial_string_definition(self):
devices = parse_devices(['/dev/sda1'])
self.assertEqual(devices[0], {
'PathOnHost': '/dev/sda1',
'PathInContainer': '/dev/sda1',
'CgroupPermissions': 'rwm'
})
def test_permissionless_string_definition(self):
devices = parse_devices(['/dev/sda1:/dev/mnt1'])
self.assertEqual(devices[0], {
'PathOnHost': '/dev/sda1',
'PathInContainer': '/dev/mnt1',
'CgroupPermissions': 'rwm'
})
def test_full_string_definition(self):
devices = parse_devices(['/dev/sda1:/dev/mnt1:r'])
self.assertEqual(devices[0], {
'PathOnHost': '/dev/sda1',
'PathInContainer': '/dev/mnt1',
'CgroupPermissions': 'r'
})
def test_hybrid_list(self):
devices = parse_devices([
'/dev/sda1:/dev/mnt1:rw',
{
'PathOnHost': '/dev/sda2',
'PathInContainer': '/dev/mnt2',
'CgroupPermissions': 'r'
}
])
self.assertEqual(devices[0], {
'PathOnHost': '/dev/sda1',
'PathInContainer': '/dev/mnt1',
'CgroupPermissions': 'rw'
})
self.assertEqual(devices[1], {
'PathOnHost': '/dev/sda2',
'PathInContainer': '/dev/mnt2',
'CgroupPermissions': 'r'
})
class ParseBytesTest(unittest.TestCase):
def test_parse_bytes_valid(self):
self.assertEqual(parse_bytes("512MB"), 536870912)
self.assertEqual(parse_bytes("512M"), 536870912)
self.assertEqual(parse_bytes("512m"), 536870912)
def test_parse_bytes_invalid(self):
self.assertRaises(DockerException, parse_bytes, "512MK")
self.assertRaises(DockerException, parse_bytes, "512L")
self.assertRaises(DockerException, parse_bytes, "127.0.0.1K")
def test_parse_bytes_float(self):
self.assertRaises(DockerException, parse_bytes, "1.5k")
def test_parse_bytes_maxint(self):
self.assertEqual(
parse_bytes("{0}k".format(sys.maxsize)), sys.maxsize * 1024
)
class UtilsTest(unittest.TestCase):
longMessage = True
def test_convert_filters(self):
tests = [
({'dangling': True}, '{"dangling": ["true"]}'),
({'dangling': "true"}, '{"dangling": ["true"]}'),
({'exited': 0}, '{"exited": [0]}'),
({'exited': [0, 1]}, '{"exited": [0, 1]}'),
]
for filters, expected in tests:
self.assertEqual(convert_filters(filters), expected)
def test_decode_json_header(self):
obj = {'a': 'b', 'c': 1}
data = None
if six.PY3:
data = base64.urlsafe_b64encode(bytes(json.dumps(obj), 'utf-8'))
else:
data = base64.urlsafe_b64encode(json.dumps(obj))
decoded_data = decode_json_header(data)
self.assertEqual(obj, decoded_data)
class SplitCommandTest(unittest.TestCase):
def test_split_command_with_unicode(self):
self.assertEqual(split_command(u'echo μμ'), ['echo', 'μμ'])
@pytest.mark.skipif(six.PY3, reason="shlex doesn't support bytes in py3")
def test_split_command_with_bytes(self):
self.assertEqual(split_command('echo μμ'), ['echo', 'μμ'])
class PortsTest(unittest.TestCase):
def test_split_port_with_host_ip(self):
internal_port, external_port = split_port("127.0.0.1:1000:2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, [("127.0.0.1", "1000")])
def test_split_port_with_protocol(self):
internal_port, external_port = split_port("127.0.0.1:1000:2000/udp")
self.assertEqual(internal_port, ["2000/udp"])
self.assertEqual(external_port, [("127.0.0.1", "1000")])
def test_split_port_with_host_ip_no_port(self):
internal_port, external_port = split_port("127.0.0.1::2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, [("127.0.0.1", None)])
def test_split_port_range_with_host_ip_no_port(self):
internal_port, external_port = split_port("127.0.0.1::2000-2001")
self.assertEqual(internal_port, ["2000", "2001"])
self.assertEqual(external_port,
[("127.0.0.1", None), ("127.0.0.1", None)])
def test_split_port_with_host_port(self):
internal_port, external_port = split_port("1000:2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, ["1000"])
def test_split_port_range_with_host_port(self):
internal_port, external_port = split_port("1000-1001:2000-2001")
self.assertEqual(internal_port, ["2000", "2001"])
self.assertEqual(external_port, ["1000", "1001"])
def test_split_port_no_host_port(self):
internal_port, external_port = split_port("2000")
self.assertEqual(internal_port, ["2000"])
self.assertEqual(external_port, None)
def test_split_port_range_no_host_port(self):
internal_port, external_port = split_port("2000-2001")
self.assertEqual(internal_port, ["2000", "2001"])
self.assertEqual(external_port, None)
def test_split_port_range_with_protocol(self):
internal_port, external_port = split_port(
"127.0.0.1:1000-1001:2000-2001/udp")
self.assertEqual(internal_port, ["2000/udp", "2001/udp"])
self.assertEqual(external_port,
[("127.0.0.1", "1000"), ("127.0.0.1", "1001")])
def test_split_port_invalid(self):
self.assertRaises(ValueError,
lambda: split_port("0.0.0.0:1000:2000:tcp"))
def test_non_matching_length_port_ranges(self):
self.assertRaises(
ValueError,
lambda: split_port("0.0.0.0:1000-1010:2000-2002/tcp")
)
def test_port_and_range_invalid(self):
self.assertRaises(ValueError,
lambda: split_port("0.0.0.0:1000:2000-2002/tcp"))
def test_port_only_with_colon(self):
self.assertRaises(ValueError,
lambda: split_port(":80"))
def test_host_only_with_colon(self):
self.assertRaises(ValueError,
lambda: split_port("localhost:"))
def test_build_port_bindings_with_one_port(self):
port_bindings = build_port_bindings(["127.0.0.1:1000:1000"])
self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")])
def test_build_port_bindings_with_matching_internal_ports(self):
port_bindings = build_port_bindings(
["127.0.0.1:1000:1000", "127.0.0.1:2000:1000"])
self.assertEqual(port_bindings["1000"],
[("127.0.0.1", "1000"), ("127.0.0.1", "2000")])
def test_build_port_bindings_with_nonmatching_internal_ports(self):
port_bindings = build_port_bindings(
["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"])
self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")])
self.assertEqual(port_bindings["2000"], [("127.0.0.1", "2000")])
def test_build_port_bindings_with_port_range(self):
port_bindings = build_port_bindings(["127.0.0.1:1000-1001:1000-1001"])
self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")])
self.assertEqual(port_bindings["1001"], [("127.0.0.1", "1001")])
def test_build_port_bindings_with_matching_internal_port_ranges(self):
port_bindings = build_port_bindings(
["127.0.0.1:1000-1001:1000-1001", "127.0.0.1:2000-2001:1000-1001"])
self.assertEqual(port_bindings["1000"],
[("127.0.0.1", "1000"), ("127.0.0.1", "2000")])
self.assertEqual(port_bindings["1001"],
[("127.0.0.1", "1001"), ("127.0.0.1", "2001")])
def test_build_port_bindings_with_nonmatching_internal_port_ranges(self):
port_bindings = build_port_bindings(
["127.0.0.1:1000:1000", "127.0.0.1:2000:2000"])
self.assertEqual(port_bindings["1000"], [("127.0.0.1", "1000")])
self.assertEqual(port_bindings["2000"], [("127.0.0.1", "2000")])
def convert_paths(collection):
if not IS_WINDOWS_PLATFORM:
return collection
return set(map(lambda x: x.replace('/', '\\'), collection))
class ExcludePathsTest(unittest.TestCase):
dirs = [
'foo',
'foo/bar',
'bar',
]
files = [
'Dockerfile',
'Dockerfile.alt',
'.dockerignore',
'a.py',
'a.go',
'b.py',
'cde.py',
'foo/a.py',
'foo/b.py',
'foo/bar/a.py',
'bar/a.py',
'foo/Dockerfile3',
]
all_paths = set(dirs + files)
def setUp(self):
self.base = make_tree(self.dirs, self.files)
def tearDown(self):
shutil.rmtree(self.base)
def exclude(self, patterns, dockerfile=None):
return set(exclude_paths(self.base, patterns, dockerfile=dockerfile))
def test_no_excludes(self):
assert self.exclude(['']) == convert_paths(self.all_paths)
def test_no_dupes(self):
paths = exclude_paths(self.base, ['!a.py'])
assert sorted(paths) == sorted(set(paths))
def test_wildcard_exclude(self):
assert self.exclude(['*']) == set(['Dockerfile', '.dockerignore'])
def test_exclude_dockerfile_dockerignore(self):
"""
Even if the .dockerignore file explicitly says to exclude
Dockerfile and/or .dockerignore, don't exclude them from
the actual tar file.
"""
assert self.exclude(['Dockerfile', '.dockerignore']) == convert_paths(
self.all_paths
)
def test_exclude_custom_dockerfile(self):
"""
If we're using a custom Dockerfile, make sure that's not
excluded.
"""
assert self.exclude(['*'], dockerfile='Dockerfile.alt') == \
set(['Dockerfile.alt', '.dockerignore'])
assert self.exclude(['*'], dockerfile='foo/Dockerfile3') == \
set(['foo/Dockerfile3', '.dockerignore'])
def test_exclude_dockerfile_child(self):
includes = self.exclude(['foo/'], dockerfile='foo/Dockerfile3')
assert 'foo/Dockerfile3' in includes
assert 'foo/a.py' not in includes
def test_single_filename(self):
assert self.exclude(['a.py']) == convert_paths(
self.all_paths - set(['a.py'])
)
def test_single_filename_leading_dot_slash(self):
assert self.exclude(['./a.py']) == convert_paths(
self.all_paths - set(['a.py'])
)
# As odd as it sounds, a filename pattern with a trailing slash on the
# end *will* result in that file being excluded.
def test_single_filename_trailing_slash(self):
assert self.exclude(['a.py/']) == convert_paths(
self.all_paths - set(['a.py'])
)
def test_wildcard_filename_start(self):
assert self.exclude(['*.py']) == convert_paths(
self.all_paths - set(['a.py', 'b.py', 'cde.py'])
)
def test_wildcard_with_exception(self):
assert self.exclude(['*.py', '!b.py']) == convert_paths(
self.all_paths - set(['a.py', 'cde.py'])
)
def test_wildcard_with_wildcard_exception(self):
assert self.exclude(['*.*', '!*.go']) == convert_paths(
self.all_paths - set([
'a.py', 'b.py', 'cde.py', 'Dockerfile.alt',
])
)
def test_wildcard_filename_end(self):
assert self.exclude(['a.*']) == convert_paths(
self.all_paths - set(['a.py', 'a.go'])
)
def test_question_mark(self):
assert self.exclude(['?.py']) == convert_paths(
self.all_paths - set(['a.py', 'b.py'])
)
def test_single_subdir_single_filename(self):
assert self.exclude(['foo/a.py']) == convert_paths(
self.all_paths - set(['foo/a.py'])
)
def test_single_subdir_with_path_traversal(self):
assert self.exclude(['foo/whoops/../a.py']) == convert_paths(
self.all_paths - set(['foo/a.py'])
)
def test_single_subdir_wildcard_filename(self):
assert self.exclude(['foo/*.py']) == convert_paths(
self.all_paths - set(['foo/a.py', 'foo/b.py'])
)
def test_wildcard_subdir_single_filename(self):
assert self.exclude(['*/a.py']) == convert_paths(
self.all_paths - set(['foo/a.py', 'bar/a.py'])
)
def test_wildcard_subdir_wildcard_filename(self):
assert self.exclude(['*/*.py']) == convert_paths(
self.all_paths - set(['foo/a.py', 'foo/b.py', 'bar/a.py'])
)
def test_directory(self):
assert self.exclude(['foo']) == convert_paths(
self.all_paths - set([
'foo', 'foo/a.py', 'foo/b.py', 'foo/bar', 'foo/bar/a.py',
'foo/Dockerfile3'
])
)
def test_directory_with_trailing_slash(self):
assert self.exclude(['foo']) == convert_paths(
self.all_paths - set([
'foo', 'foo/a.py', 'foo/b.py',
'foo/bar', 'foo/bar/a.py', 'foo/Dockerfile3'
])
)
def test_directory_with_single_exception(self):
assert self.exclude(['foo', '!foo/bar/a.py']) == convert_paths(
self.all_paths - set([
'foo/a.py', 'foo/b.py', 'foo', 'foo/bar',
'foo/Dockerfile3'
])
)
def test_directory_with_subdir_exception(self):
assert self.exclude(['foo', '!foo/bar']) == convert_paths(
self.all_paths - set([
'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3'
])
)
def test_directory_with_wildcard_exception(self):
assert self.exclude(['foo', '!foo/*.py']) == convert_paths(
self.all_paths - set([
'foo/bar', 'foo/bar/a.py', 'foo', 'foo/Dockerfile3'
])
)
def test_subdirectory(self):
assert self.exclude(['foo/bar']) == convert_paths(
self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
)
class TarTest(unittest.TestCase):
def test_tar_with_excludes(self):
dirs = [
'foo',
'foo/bar',
'bar',
]
files = [
'Dockerfile',
'Dockerfile.alt',
'.dockerignore',
'a.py',
'a.go',
'b.py',
'cde.py',
'foo/a.py',
'foo/b.py',
'foo/bar/a.py',
'bar/a.py',
]
exclude = [
'*.py',
'!b.py',
'!a.go',
'foo',
'Dockerfile*',
'.dockerignore',
]
expected_names = set([
'Dockerfile',
'.dockerignore',
'a.go',
'b.py',
'bar',
'bar/a.py',
])
base = make_tree(dirs, files)
self.addCleanup(shutil.rmtree, base)
with tar(base, exclude=exclude) as archive:
tar_data = tarfile.open(fileobj=archive)
assert sorted(tar_data.getnames()) == sorted(expected_names)
def test_tar_with_empty_directory(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
for d in ['foo', 'bar']:
os.makedirs(os.path.join(base, d))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(sorted(tar_data.getnames()), ['bar', 'foo'])
@pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
def test_tar_with_file_symlinks(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
with open(os.path.join(base, 'foo'), 'w') as f:
f.write("content")
os.makedirs(os.path.join(base, 'bar'))
os.symlink('../foo', os.path.join(base, 'bar/foo'))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(
sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo']
)
@pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
def test_tar_with_directory_symlinks(self):
base = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, base)
for d in ['foo', 'bar']:
os.makedirs(os.path.join(base, d))
os.symlink('../foo', os.path.join(base, 'bar/foo'))
with tar(base) as archive:
tar_data = tarfile.open(fileobj=archive)
self.assertEqual(
sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo']
)
class ShouldCheckDirectoryTest(unittest.TestCase):
exclude_patterns = [
'exclude_rather_large_directory',
'dir/with/subdir_excluded',
'dir/with/exceptions'
]
include_patterns = [
'dir/with/exceptions/like_this_one',
'dir/with/exceptions/in/descendents'
]
def test_should_check_directory_not_excluded(self):
self.assertTrue(
should_check_directory('not_excluded', self.exclude_patterns,
self.include_patterns)
)
self.assertTrue(
should_check_directory('dir/with', self.exclude_patterns,
self.include_patterns)
)
def test_shoud_check_parent_directories_of_excluded(self):
self.assertTrue(
should_check_directory('dir', self.exclude_patterns,
self.include_patterns)
)
self.assertTrue(
should_check_directory('dir/with', self.exclude_patterns,
self.include_patterns)
)
def test_should_not_check_excluded_directories_with_no_exceptions(self):
self.assertFalse(
should_check_directory('exclude_rather_large_directory',
self.exclude_patterns, self.include_patterns
)
)
self.assertFalse(
should_check_directory('dir/with/subdir_excluded',
self.exclude_patterns, self.include_patterns
)
)
def test_should_check_excluded_directory_with_exceptions(self):
self.assertTrue(
should_check_directory('dir/with/exceptions',
self.exclude_patterns, self.include_patterns
)
)
self.assertTrue(
should_check_directory('dir/with/exceptions/in',
self.exclude_patterns, self.include_patterns
)
)
def test_should_not_check_siblings_of_exceptions(self):
self.assertFalse(
should_check_directory('dir/with/exceptions/but_not_here',
self.exclude_patterns, self.include_patterns
)
)
def test_should_check_subdirectories_of_exceptions(self):
self.assertTrue(
should_check_directory('dir/with/exceptions/like_this_one/subdir',
self.exclude_patterns, self.include_patterns
)
)
class FormatEnvironmentTest(unittest.TestCase):
def test_format_env_binary_unicode_value(self):
env_dict = {
'ARTIST_NAME': b'\xec\x86\xa1\xec\xa7\x80\xec\x9d\x80'
}
assert format_environment(env_dict) == [u'ARTIST_NAME=송지은']
def test_format_env_no_value(self):
env_dict = {
'FOO': None,
'BAR': '',
}
assert sorted(format_environment(env_dict)) == ['BAR=', 'FOO']