summaryrefslogtreecommitdiff
path: root/tests/unit
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit')
-rw-r--r--tests/unit/api_container_test.py4
-rw-r--r--tests/unit/api_test.py60
-rw-r--r--tests/unit/auth_test.py66
-rw-r--r--tests/unit/fake_api_client.py16
-rw-r--r--tests/unit/models_containers_test.py12
-rw-r--r--tests/unit/utils_build_test.py493
-rw-r--r--tests/unit/utils_test.py480
7 files changed, 634 insertions, 497 deletions
diff --git a/tests/unit/api_container_test.py b/tests/unit/api_container_test.py
index c33f129..a7e183c 100644
--- a/tests/unit/api_container_test.py
+++ b/tests/unit/api_container_test.py
@@ -1335,7 +1335,7 @@ class ContainerTest(BaseAPIClientTest):
'POST',
url_prefix + 'containers/3cc2351ab11b/restart',
params={'t': 2},
- timeout=DEFAULT_TIMEOUT_SECONDS
+ timeout=(DEFAULT_TIMEOUT_SECONDS + 2)
)
def test_restart_container_with_dict_instead_of_id(self):
@@ -1345,7 +1345,7 @@ class ContainerTest(BaseAPIClientTest):
'POST',
url_prefix + 'containers/3cc2351ab11b/restart',
params={'t': 2},
- timeout=DEFAULT_TIMEOUT_SECONDS
+ timeout=(DEFAULT_TIMEOUT_SECONDS + 2)
)
def test_remove_container(self):
diff --git a/tests/unit/api_test.py b/tests/unit/api_test.py
index 46cbd68..af2bb1c 100644
--- a/tests/unit/api_test.py
+++ b/tests/unit/api_test.py
@@ -44,7 +44,7 @@ def response(status_code=200, content='', headers=None, reason=None, elapsed=0,
return res
-def fake_resolve_authconfig(authconfig, registry=None):
+def fake_resolve_authconfig(authconfig, registry=None, *args, **kwargs):
return None
@@ -365,7 +365,7 @@ class DockerApiTest(BaseAPIClientTest):
assert result == content
-class StreamTest(unittest.TestCase):
+class UnixSocketStreamTest(unittest.TestCase):
def setUp(self):
socket_dir = tempfile.mkdtemp()
self.build_context = tempfile.mkdtemp()
@@ -462,7 +462,61 @@ class StreamTest(unittest.TestCase):
raise e
assert list(stream) == [
- str(i).encode() for i in range(50)]
+ str(i).encode() for i in range(50)
+ ]
+
+
+class TCPSocketStreamTest(unittest.TestCase):
+ text_data = b'''
+ Now, those children out there, they're jumping through the
+ flames in the hope that the god of the fire will make them fruitful.
+ Really, you can't blame them. After all, what girl would not prefer the
+ child of a god to that of some acne-scarred artisan?
+ '''
+
+ def setUp(self):
+
+ self.server = six.moves.socketserver.ThreadingTCPServer(
+ ('', 0), self.get_handler_class()
+ )
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.setDaemon(True)
+ self.thread.start()
+ self.address = 'http://{}:{}'.format(
+ socket.gethostname(), self.server.server_address[1]
+ )
+
+ def tearDown(self):
+ self.server.shutdown()
+ self.server.server_close()
+ self.thread.join()
+
+ def get_handler_class(self):
+ text_data = self.text_data
+
+ class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object):
+ def do_POST(self):
+ self.send_response(101)
+ self.send_header(
+ 'Content-Type', 'application/vnd.docker.raw-stream'
+ )
+ self.send_header('Connection', 'Upgrade')
+ self.send_header('Upgrade', 'tcp')
+ self.end_headers()
+ self.wfile.flush()
+ time.sleep(0.2)
+ self.wfile.write(text_data)
+ self.wfile.flush()
+
+ return Handler
+
+ def test_read_from_socket(self):
+ with APIClient(base_url=self.address) as client:
+ resp = client._post(client._url('/dummy'), stream=True)
+ data = client._read_from_socket(resp, stream=True, tty=True)
+ results = b''.join(data)
+
+ assert results == self.text_data
class UserAgentTest(unittest.TestCase):
diff --git a/tests/unit/auth_test.py b/tests/unit/auth_test.py
index ee32ca0..947d680 100644
--- a/tests/unit/auth_test.py
+++ b/tests/unit/auth_test.py
@@ -282,22 +282,64 @@ class LoadConfigTest(unittest.TestCase):
cfg = auth.load_config(folder)
assert cfg is not None
- def test_load_config(self):
+ def test_load_legacy_config(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
- dockercfg_path = os.path.join(folder, '.dockercfg')
- with open(dockercfg_path, 'w') as f:
- auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ cfg_path = os.path.join(folder, '.dockercfg')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ with open(cfg_path, 'w') as f:
f.write('auth = {0}\n'.format(auth_))
f.write('email = sakuya@scarlet.net')
- cfg = auth.load_config(dockercfg_path)
- assert auth.INDEX_NAME in cfg
- assert cfg[auth.INDEX_NAME] is not None
- cfg = cfg[auth.INDEX_NAME]
+
+ cfg = auth.load_config(cfg_path)
+ assert auth.resolve_authconfig(cfg) is not None
+ assert cfg['auths'][auth.INDEX_NAME] is not None
+ cfg = cfg['auths'][auth.INDEX_NAME]
assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi'
assert cfg['email'] == 'sakuya@scarlet.net'
- assert cfg.get('auth') is None
+ assert cfg.get('Auth') is None
+
+ def test_load_json_config(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ cfg_path = os.path.join(folder, '.dockercfg')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ email = 'sakuya@scarlet.net'
+ with open(cfg_path, 'w') as f:
+ json.dump(
+ {auth.INDEX_URL: {'auth': auth_, 'email': email}}, f
+ )
+ cfg = auth.load_config(cfg_path)
+ assert auth.resolve_authconfig(cfg) is not None
+ assert cfg['auths'][auth.INDEX_URL] is not None
+ cfg = cfg['auths'][auth.INDEX_URL]
+ assert cfg['username'] == 'sakuya'
+ assert cfg['password'] == 'izayoi'
+ assert cfg['email'] == email
+ assert cfg.get('Auth') is None
+
+ def test_load_modern_json_config(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ cfg_path = os.path.join(folder, 'config.json')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ email = 'sakuya@scarlet.net'
+ with open(cfg_path, 'w') as f:
+ json.dump({
+ 'auths': {
+ auth.INDEX_URL: {
+ 'auth': auth_, 'email': email
+ }
+ }
+ }, f)
+ cfg = auth.load_config(cfg_path)
+ assert auth.resolve_authconfig(cfg) is not None
+ assert cfg['auths'][auth.INDEX_URL] is not None
+ cfg = cfg['auths'][auth.INDEX_URL]
+ assert cfg['username'] == 'sakuya'
+ assert cfg['password'] == 'izayoi'
+ assert cfg['email'] == email
def test_load_config_with_random_name(self):
folder = tempfile.mkdtemp()
@@ -318,7 +360,7 @@ class LoadConfigTest(unittest.TestCase):
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
- cfg = auth.load_config(dockercfg_path)
+ cfg = auth.load_config(dockercfg_path)['auths']
assert registry in cfg
assert cfg[registry] is not None
cfg = cfg[registry]
@@ -345,7 +387,7 @@ class LoadConfigTest(unittest.TestCase):
json.dump(config, f)
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
- cfg = auth.load_config(None)
+ cfg = auth.load_config(None)['auths']
assert registry in cfg
assert cfg[registry] is not None
cfg = cfg[registry]
@@ -422,7 +464,7 @@ class LoadConfigTest(unittest.TestCase):
json.dump(config, f)
cfg = auth.load_config(dockercfg_path)
- assert cfg == {}
+ assert cfg == {'auths': {}}
def test_load_config_invalid_auth_dict(self):
folder = tempfile.mkdtemp()
diff --git a/tests/unit/fake_api_client.py b/tests/unit/fake_api_client.py
index 15b60ea..2147bfd 100644
--- a/tests/unit/fake_api_client.py
+++ b/tests/unit/fake_api_client.py
@@ -20,15 +20,18 @@ class CopyReturnMagicMock(mock.MagicMock):
return ret
-def make_fake_api_client():
+def make_fake_api_client(overrides=None):
"""
Returns non-complete fake APIClient.
This returns most of the default cases correctly, but most arguments that
change behaviour will not work.
"""
+
+ if overrides is None:
+ overrides = {}
api_client = docker.APIClient()
- mock_client = CopyReturnMagicMock(**{
+ mock_attrs = {
'build.return_value': fake_api.FAKE_IMAGE_ID,
'commit.return_value': fake_api.post_fake_commit()[1],
'containers.return_value': fake_api.get_fake_containers()[1],
@@ -47,15 +50,18 @@ def make_fake_api_client():
'networks.return_value': fake_api.get_fake_network_list()[1],
'start.return_value': None,
'wait.return_value': {'StatusCode': 0},
- })
+ }
+ mock_attrs.update(overrides)
+ mock_client = CopyReturnMagicMock(**mock_attrs)
+
mock_client._version = docker.constants.DEFAULT_DOCKER_API_VERSION
return mock_client
-def make_fake_client():
+def make_fake_client(overrides=None):
"""
Returns a Client with a fake APIClient.
"""
client = docker.DockerClient()
- client.api = make_fake_api_client()
+ client.api = make_fake_api_client(overrides)
return client
diff --git a/tests/unit/models_containers_test.py b/tests/unit/models_containers_test.py
index 2b0b499..48a5288 100644
--- a/tests/unit/models_containers_test.py
+++ b/tests/unit/models_containers_test.py
@@ -359,6 +359,18 @@ class ContainerCollectionTest(unittest.TestCase):
assert isinstance(containers[0], Container)
assert containers[0].id == FAKE_CONTAINER_ID
+ def test_list_ignore_removed(self):
+ def side_effect(*args, **kwargs):
+ raise docker.errors.NotFound('Container not found')
+ client = make_fake_client({
+ 'inspect_container.side_effect': side_effect
+ })
+
+ with pytest.raises(docker.errors.NotFound):
+ client.containers.list(all=True, ignore_removed=False)
+
+ assert client.containers.list(all=True, ignore_removed=True) == []
+
class ContainerTest(unittest.TestCase):
def test_name(self):
diff --git a/tests/unit/utils_build_test.py b/tests/unit/utils_build_test.py
new file mode 100644
index 0000000..012f15b
--- /dev/null
+++ b/tests/unit/utils_build_test.py
@@ -0,0 +1,493 @@
+# -*- coding: utf-8 -*-
+
+import os
+import os.path
+import shutil
+import socket
+import tarfile
+import tempfile
+import unittest
+
+
+from docker.constants import IS_WINDOWS_PLATFORM
+from docker.utils import exclude_paths, tar
+
+import pytest
+
+from ..helpers import make_tree
+
+
+def convert_paths(collection):
+ return set(map(convert_path, collection))
+
+
+def convert_path(path):
+ return path.replace('/', os.path.sep)
+
+
+class ExcludePathsTest(unittest.TestCase):
+ dirs = [
+ 'foo',
+ 'foo/bar',
+ 'bar',
+ 'target',
+ 'target/subdir',
+ 'subdir',
+ 'subdir/target',
+ 'subdir/target/subdir',
+ 'subdir/subdir2',
+ 'subdir/subdir2/target',
+ 'subdir/subdir2/target/subdir'
+ ]
+
+ files = [
+ 'Dockerfile',
+ 'Dockerfile.alt',
+ '.dockerignore',
+ 'a.py',
+ 'a.go',
+ 'b.py',
+ 'cde.py',
+ 'foo/a.py',
+ 'foo/b.py',
+ 'foo/bar/a.py',
+ 'bar/a.py',
+ 'foo/Dockerfile3',
+ 'target/file.txt',
+ 'target/subdir/file.txt',
+ 'subdir/file.txt',
+ 'subdir/target/file.txt',
+ 'subdir/target/subdir/file.txt',
+ 'subdir/subdir2/file.txt',
+ 'subdir/subdir2/target/file.txt',
+ 'subdir/subdir2/target/subdir/file.txt',
+ ]
+
+ all_paths = set(dirs + files)
+
+ def setUp(self):
+ self.base = make_tree(self.dirs, self.files)
+
+ def tearDown(self):
+ shutil.rmtree(self.base)
+
+ def exclude(self, patterns, dockerfile=None):
+ return set(exclude_paths(self.base, patterns, dockerfile=dockerfile))
+
+ def test_no_excludes(self):
+ assert self.exclude(['']) == convert_paths(self.all_paths)
+
+ def test_no_dupes(self):
+ paths = exclude_paths(self.base, ['!a.py'])
+ assert sorted(paths) == sorted(set(paths))
+
+ def test_wildcard_exclude(self):
+ assert self.exclude(['*']) == set(['Dockerfile', '.dockerignore'])
+
+ def test_exclude_dockerfile_dockerignore(self):
+ """
+ Even if the .dockerignore file explicitly says to exclude
+ Dockerfile and/or .dockerignore, don't exclude them from
+ the actual tar file.
+ """
+ assert self.exclude(['Dockerfile', '.dockerignore']) == convert_paths(
+ self.all_paths
+ )
+
+ def test_exclude_custom_dockerfile(self):
+ """
+ If we're using a custom Dockerfile, make sure that's not
+ excluded.
+ """
+ assert self.exclude(['*'], dockerfile='Dockerfile.alt') == set(
+ ['Dockerfile.alt', '.dockerignore']
+ )
+
+ assert self.exclude(
+ ['*'], dockerfile='foo/Dockerfile3'
+ ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore']))
+
+ # https://github.com/docker/docker-py/issues/1956
+ assert self.exclude(
+ ['*'], dockerfile='./foo/Dockerfile3'
+ ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore']))
+
+ def test_exclude_dockerfile_child(self):
+ includes = self.exclude(['foo/'], dockerfile='foo/Dockerfile3')
+ assert convert_path('foo/Dockerfile3') in includes
+ assert convert_path('foo/a.py') not in includes
+
+ def test_single_filename(self):
+ assert self.exclude(['a.py']) == convert_paths(
+ self.all_paths - set(['a.py'])
+ )
+
+ def test_single_filename_leading_dot_slash(self):
+ assert self.exclude(['./a.py']) == convert_paths(
+ self.all_paths - set(['a.py'])
+ )
+
+ # As odd as it sounds, a filename pattern with a trailing slash on the
+ # end *will* result in that file being excluded.
+ def test_single_filename_trailing_slash(self):
+ assert self.exclude(['a.py/']) == convert_paths(
+ self.all_paths - set(['a.py'])
+ )
+
+ def test_wildcard_filename_start(self):
+ assert self.exclude(['*.py']) == convert_paths(
+ self.all_paths - set(['a.py', 'b.py', 'cde.py'])
+ )
+
+ def test_wildcard_with_exception(self):
+ assert self.exclude(['*.py', '!b.py']) == convert_paths(
+ self.all_paths - set(['a.py', 'cde.py'])
+ )
+
+ def test_wildcard_with_wildcard_exception(self):
+ assert self.exclude(['*.*', '!*.go']) == convert_paths(
+ self.all_paths - set([
+ 'a.py', 'b.py', 'cde.py', 'Dockerfile.alt',
+ ])
+ )
+
+ def test_wildcard_filename_end(self):
+ assert self.exclude(['a.*']) == convert_paths(
+ self.all_paths - set(['a.py', 'a.go'])
+ )
+
+ def test_question_mark(self):
+ assert self.exclude(['?.py']) == convert_paths(
+ self.all_paths - set(['a.py', 'b.py'])
+ )
+
+ def test_single_subdir_single_filename(self):
+ assert self.exclude(['foo/a.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py'])
+ )
+
+ def test_single_subdir_single_filename_leading_slash(self):
+ assert self.exclude(['/foo/a.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py'])
+ )
+
+ def test_exclude_include_absolute_path(self):
+ base = make_tree([], ['a.py', 'b.py'])
+ assert exclude_paths(
+ base,
+ ['/*', '!/*.py']
+ ) == set(['a.py', 'b.py'])
+
+ def test_single_subdir_with_path_traversal(self):
+ assert self.exclude(['foo/whoops/../a.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py'])
+ )
+
+ def test_single_subdir_wildcard_filename(self):
+ assert self.exclude(['foo/*.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py', 'foo/b.py'])
+ )
+
+ def test_wildcard_subdir_single_filename(self):
+ assert self.exclude(['*/a.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py', 'bar/a.py'])
+ )
+
+ def test_wildcard_subdir_wildcard_filename(self):
+ assert self.exclude(['*/*.py']) == convert_paths(
+ self.all_paths - set(['foo/a.py', 'foo/b.py', 'bar/a.py'])
+ )
+
+ def test_directory(self):
+ assert self.exclude(['foo']) == convert_paths(
+ self.all_paths - set([
+ 'foo', 'foo/a.py', 'foo/b.py', 'foo/bar', 'foo/bar/a.py',
+ 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_directory_with_trailing_slash(self):
+ assert self.exclude(['foo']) == convert_paths(
+ self.all_paths - set([
+ 'foo', 'foo/a.py', 'foo/b.py',
+ 'foo/bar', 'foo/bar/a.py', 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_directory_with_single_exception(self):
+ assert self.exclude(['foo', '!foo/bar/a.py']) == convert_paths(
+ self.all_paths - set([
+ 'foo/a.py', 'foo/b.py', 'foo', 'foo/bar',
+ 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_directory_with_subdir_exception(self):
+ assert self.exclude(['foo', '!foo/bar']) == convert_paths(
+ self.all_paths - set([
+ 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3'
+ ])
+ )
+
+ @pytest.mark.skipif(
+ not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows'
+ )
+ def test_directory_with_subdir_exception_win32_pathsep(self):
+ assert self.exclude(['foo', '!foo\\bar']) == convert_paths(
+ self.all_paths - set([
+ 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_directory_with_wildcard_exception(self):
+ assert self.exclude(['foo', '!foo/*.py']) == convert_paths(
+ self.all_paths - set([
+ 'foo/bar', 'foo/bar/a.py', 'foo', 'foo/Dockerfile3'
+ ])
+ )
+
+ def test_subdirectory(self):
+ assert self.exclude(['foo/bar']) == convert_paths(
+ self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
+ )
+
+ @pytest.mark.skipif(
+ not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows'
+ )
+ def test_subdirectory_win32_pathsep(self):
+ assert self.exclude(['foo\\bar']) == convert_paths(
+ self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
+ )
+
+ def test_double_wildcard(self):
+ assert self.exclude(['**/a.py']) == convert_paths(
+ self.all_paths - set(
+ ['a.py', 'foo/a.py', 'foo/bar/a.py', 'bar/a.py']
+ )
+ )
+
+ assert self.exclude(['foo/**/bar']) == convert_paths(
+ self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
+ )
+
+ def test_single_and_double_wildcard(self):
+ assert self.exclude(['**/target/*/*']) == convert_paths(
+ self.all_paths - set(
+ ['target/subdir/file.txt',
+ 'subdir/target/subdir/file.txt',
+ 'subdir/subdir2/target/subdir/file.txt']
+ )
+ )
+
+ def test_trailing_double_wildcard(self):
+ assert self.exclude(['subdir/**']) == convert_paths(
+ self.all_paths - set(
+ ['subdir/file.txt',
+ 'subdir/target/file.txt',
+ 'subdir/target/subdir/file.txt',
+ 'subdir/subdir2/file.txt',
+ 'subdir/subdir2/target/file.txt',
+ 'subdir/subdir2/target/subdir/file.txt',
+ 'subdir/target',
+ 'subdir/target/subdir',
+ 'subdir/subdir2',
+ 'subdir/subdir2/target',
+ 'subdir/subdir2/target/subdir']
+ )
+ )
+
+ def test_double_wildcard_with_exception(self):
+ assert self.exclude(['**', '!bar', '!foo/bar']) == convert_paths(
+ set([
+ 'foo/bar', 'foo/bar/a.py', 'bar', 'bar/a.py', 'Dockerfile',
+ '.dockerignore',
+ ])
+ )
+
+ def test_include_wildcard(self):
+ # This may be surprising but it matches the CLI's behavior
+ # (tested with 18.05.0-ce on linux)
+ base = make_tree(['a'], ['a/b.py'])
+ assert exclude_paths(
+ base,
+ ['*', '!*/b.py']
+ ) == set()
+
+ def test_last_line_precedence(self):
+ base = make_tree(
+ [],
+ ['garbage.md',
+ 'trash.md',
+ 'README.md',
+ 'README-bis.md',
+ 'README-secret.md'])
+ assert exclude_paths(
+ base,
+ ['*.md', '!README*.md', 'README-secret.md']
+ ) == set(['README.md', 'README-bis.md'])
+
+ def test_parent_directory(self):
+ base = make_tree(
+ [],
+ ['a.py',
+ 'b.py',
+ 'c.py'])
+ # Dockerignore reference stipulates that absolute paths are
+ # equivalent to relative paths, hence /../foo should be
+ # equivalent to ../foo. It also stipulates that paths are run
+ # through Go's filepath.Clean, which explicitely "replace
+ # "/.." by "/" at the beginning of a path".
+ assert exclude_paths(
+ base,
+ ['../a.py', '/../b.py']
+ ) == set(['c.py'])
+
+
+class TarTest(unittest.TestCase):
+ def test_tar_with_excludes(self):
+ dirs = [
+ 'foo',
+ 'foo/bar',
+ 'bar',
+ ]
+
+ files = [
+ 'Dockerfile',
+ 'Dockerfile.alt',
+ '.dockerignore',
+ 'a.py',
+ 'a.go',
+ 'b.py',
+ 'cde.py',
+ 'foo/a.py',
+ 'foo/b.py',
+ 'foo/bar/a.py',
+ 'bar/a.py',
+ ]
+
+ exclude = [
+ '*.py',
+ '!b.py',
+ '!a.go',
+ 'foo',
+ 'Dockerfile*',
+ '.dockerignore',
+ ]
+
+ expected_names = set([
+ 'Dockerfile',
+ '.dockerignore',
+ 'a.go',
+ 'b.py',
+ 'bar',
+ 'bar/a.py',
+ ])
+
+ base = make_tree(dirs, files)
+ self.addCleanup(shutil.rmtree, base)
+
+ with tar(base, exclude=exclude) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == sorted(expected_names)
+
+ def test_tar_with_empty_directory(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'foo']
+
+ @pytest.mark.skipif(
+ IS_WINDOWS_PLATFORM or os.geteuid() == 0,
+ reason='root user always has access ; no chmod on Windows'
+ )
+ def test_tar_with_inaccessible_file(self):
+ base = tempfile.mkdtemp()
+ full_path = os.path.join(base, 'foo')
+ self.addCleanup(shutil.rmtree, base)
+ with open(full_path, 'w') as f:
+ f.write('content')
+ os.chmod(full_path, 0o222)
+ with pytest.raises(IOError) as ei:
+ tar(base)
+
+ assert 'Can not read file in context: {}'.format(full_path) in (
+ ei.exconly()
+ )
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
+ def test_tar_with_file_symlinks(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ with open(os.path.join(base, 'foo'), 'w') as f:
+ f.write("content")
+ os.makedirs(os.path.join(base, 'bar'))
+ os.symlink('../foo', os.path.join(base, 'bar/foo'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
+ def test_tar_with_directory_symlinks(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+ os.symlink('../foo', os.path.join(base, 'bar/foo'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
+ def test_tar_with_broken_symlinks(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+
+ os.symlink('../baz', os.path.join(base, 'bar/foo'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No UNIX sockets on Win32')
+ def test_tar_socket_file(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+ sock = socket.socket(socket.AF_UNIX)
+ self.addCleanup(sock.close)
+ sock.bind(os.path.join(base, 'test.sock'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == ['bar', 'foo']
+
+ def tar_test_negative_mtime_bug(self):
+ base = tempfile.mkdtemp()
+ filename = os.path.join(base, 'th.txt')
+ self.addCleanup(shutil.rmtree, base)
+ with open(filename, 'w') as f:
+ f.write('Invisible Full Moon')
+ os.utime(filename, (12345, -3600.0))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert tar_data.getnames() == ['th.txt']
+ assert tar_data.getmember('th.txt').mtime == -3600
+
+ @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
+ def test_tar_directory_link(self):
+ dirs = ['a', 'b', 'a/c']
+ files = ['a/hello.py', 'b/utils.py', 'a/c/descend.py']
+ base = make_tree(dirs, files)
+ self.addCleanup(shutil.rmtree, base)
+ os.symlink(os.path.join(base, 'b'), os.path.join(base, 'a/c/b'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ names = tar_data.getnames()
+ for member in dirs + files:
+ assert member in names
+ assert 'a/c/b' in names
+ assert 'a/c/b/utils.py' not in names
diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py
index 00456e8..8880cfe 100644
--- a/tests/unit/utils_test.py
+++ b/tests/unit/utils_test.py
@@ -5,29 +5,25 @@ import json
import os
import os.path
import shutil
-import socket
import sys
-import tarfile
import tempfile
import unittest
-import pytest
-import six
from docker.api.client import APIClient
-from docker.constants import IS_WINDOWS_PLATFORM
from docker.errors import DockerException
from docker.utils import (
- parse_repository_tag, parse_host, convert_filters, kwargs_from_env,
- parse_bytes, parse_env_file, exclude_paths, convert_volume_binds,
- decode_json_header, tar, split_command, parse_devices, update_headers,
+ convert_filters, convert_volume_binds, decode_json_header, kwargs_from_env,
+ parse_bytes, parse_devices, parse_env_file, parse_host,
+ parse_repository_tag, split_command, update_headers,
)
from docker.utils.ports import build_port_bindings, split_port
from docker.utils.utils import format_environment
-from ..helpers import make_tree
+import pytest
+import six
TEST_CERT_DIR = os.path.join(
os.path.dirname(__file__),
@@ -608,472 +604,6 @@ class PortsTest(unittest.TestCase):
assert port_bindings["2000"] == [("127.0.0.1", "2000")]
-def convert_paths(collection):
- return set(map(convert_path, collection))
-
-
-def convert_path(path):
- return path.replace('/', os.path.sep)
-
-
-class ExcludePathsTest(unittest.TestCase):
- dirs = [
- 'foo',
- 'foo/bar',
- 'bar',
- 'target',
- 'target/subdir',
- 'subdir',
- 'subdir/target',
- 'subdir/target/subdir',
- 'subdir/subdir2',
- 'subdir/subdir2/target',
- 'subdir/subdir2/target/subdir'
- ]
-
- files = [
- 'Dockerfile',
- 'Dockerfile.alt',
- '.dockerignore',
- 'a.py',
- 'a.go',
- 'b.py',
- 'cde.py',
- 'foo/a.py',
- 'foo/b.py',
- 'foo/bar/a.py',
- 'bar/a.py',
- 'foo/Dockerfile3',
- 'target/file.txt',
- 'target/subdir/file.txt',
- 'subdir/file.txt',
- 'subdir/target/file.txt',
- 'subdir/target/subdir/file.txt',
- 'subdir/subdir2/file.txt',
- 'subdir/subdir2/target/file.txt',
- 'subdir/subdir2/target/subdir/file.txt',
- ]
-
- all_paths = set(dirs + files)
-
- def setUp(self):
- self.base = make_tree(self.dirs, self.files)
-
- def tearDown(self):
- shutil.rmtree(self.base)
-
- def exclude(self, patterns, dockerfile=None):
- return set(exclude_paths(self.base, patterns, dockerfile=dockerfile))
-
- def test_no_excludes(self):
- assert self.exclude(['']) == convert_paths(self.all_paths)
-
- def test_no_dupes(self):
- paths = exclude_paths(self.base, ['!a.py'])
- assert sorted(paths) == sorted(set(paths))
-
- def test_wildcard_exclude(self):
- assert self.exclude(['*']) == set(['Dockerfile', '.dockerignore'])
-
- def test_exclude_dockerfile_dockerignore(self):
- """
- Even if the .dockerignore file explicitly says to exclude
- Dockerfile and/or .dockerignore, don't exclude them from
- the actual tar file.
- """
- assert self.exclude(['Dockerfile', '.dockerignore']) == convert_paths(
- self.all_paths
- )
-
- def test_exclude_custom_dockerfile(self):
- """
- If we're using a custom Dockerfile, make sure that's not
- excluded.
- """
- assert self.exclude(['*'], dockerfile='Dockerfile.alt') == set(
- ['Dockerfile.alt', '.dockerignore']
- )
-
- assert self.exclude(
- ['*'], dockerfile='foo/Dockerfile3'
- ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore']))
-
- # https://github.com/docker/docker-py/issues/1956
- assert self.exclude(
- ['*'], dockerfile='./foo/Dockerfile3'
- ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore']))
-
- def test_exclude_dockerfile_child(self):
- includes = self.exclude(['foo/'], dockerfile='foo/Dockerfile3')
- assert convert_path('foo/Dockerfile3') in includes
- assert convert_path('foo/a.py') not in includes
-
- def test_single_filename(self):
- assert self.exclude(['a.py']) == convert_paths(
- self.all_paths - set(['a.py'])
- )
-
- def test_single_filename_leading_dot_slash(self):
- assert self.exclude(['./a.py']) == convert_paths(
- self.all_paths - set(['a.py'])
- )
-
- # As odd as it sounds, a filename pattern with a trailing slash on the
- # end *will* result in that file being excluded.
- def test_single_filename_trailing_slash(self):
- assert self.exclude(['a.py/']) == convert_paths(
- self.all_paths - set(['a.py'])
- )
-
- def test_wildcard_filename_start(self):
- assert self.exclude(['*.py']) == convert_paths(
- self.all_paths - set(['a.py', 'b.py', 'cde.py'])
- )
-
- def test_wildcard_with_exception(self):
- assert self.exclude(['*.py', '!b.py']) == convert_paths(
- self.all_paths - set(['a.py', 'cde.py'])
- )
-
- def test_wildcard_with_wildcard_exception(self):
- assert self.exclude(['*.*', '!*.go']) == convert_paths(
- self.all_paths - set([
- 'a.py', 'b.py', 'cde.py', 'Dockerfile.alt',
- ])
- )
-
- def test_wildcard_filename_end(self):
- assert self.exclude(['a.*']) == convert_paths(
- self.all_paths - set(['a.py', 'a.go'])
- )
-
- def test_question_mark(self):
- assert self.exclude(['?.py']) == convert_paths(
- self.all_paths - set(['a.py', 'b.py'])
- )
-
- def test_single_subdir_single_filename(self):
- assert self.exclude(['foo/a.py']) == convert_paths(
- self.all_paths - set(['foo/a.py'])
- )
-
- def test_single_subdir_single_filename_leading_slash(self):
- assert self.exclude(['/foo/a.py']) == convert_paths(
- self.all_paths - set(['foo/a.py'])
- )
-
- def test_exclude_include_absolute_path(self):
- base = make_tree([], ['a.py', 'b.py'])
- assert exclude_paths(
- base,
- ['/*', '!/*.py']
- ) == set(['a.py', 'b.py'])
-
- def test_single_subdir_with_path_traversal(self):
- assert self.exclude(['foo/whoops/../a.py']) == convert_paths(
- self.all_paths - set(['foo/a.py'])
- )
-
- def test_single_subdir_wildcard_filename(self):
- assert self.exclude(['foo/*.py']) == convert_paths(
- self.all_paths - set(['foo/a.py', 'foo/b.py'])
- )
-
- def test_wildcard_subdir_single_filename(self):
- assert self.exclude(['*/a.py']) == convert_paths(
- self.all_paths - set(['foo/a.py', 'bar/a.py'])
- )
-
- def test_wildcard_subdir_wildcard_filename(self):
- assert self.exclude(['*/*.py']) == convert_paths(
- self.all_paths - set(['foo/a.py', 'foo/b.py', 'bar/a.py'])
- )
-
- def test_directory(self):
- assert self.exclude(['foo']) == convert_paths(
- self.all_paths - set([
- 'foo', 'foo/a.py', 'foo/b.py', 'foo/bar', 'foo/bar/a.py',
- 'foo/Dockerfile3'
- ])
- )
-
- def test_directory_with_trailing_slash(self):
- assert self.exclude(['foo']) == convert_paths(
- self.all_paths - set([
- 'foo', 'foo/a.py', 'foo/b.py',
- 'foo/bar', 'foo/bar/a.py', 'foo/Dockerfile3'
- ])
- )
-
- def test_directory_with_single_exception(self):
- assert self.exclude(['foo', '!foo/bar/a.py']) == convert_paths(
- self.all_paths - set([
- 'foo/a.py', 'foo/b.py', 'foo', 'foo/bar',
- 'foo/Dockerfile3'
- ])
- )
-
- def test_directory_with_subdir_exception(self):
- assert self.exclude(['foo', '!foo/bar']) == convert_paths(
- self.all_paths - set([
- 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3'
- ])
- )
-
- @pytest.mark.skipif(
- not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows'
- )
- def test_directory_with_subdir_exception_win32_pathsep(self):
- assert self.exclude(['foo', '!foo\\bar']) == convert_paths(
- self.all_paths - set([
- 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3'
- ])
- )
-
- def test_directory_with_wildcard_exception(self):
- assert self.exclude(['foo', '!foo/*.py']) == convert_paths(
- self.all_paths - set([
- 'foo/bar', 'foo/bar/a.py', 'foo', 'foo/Dockerfile3'
- ])
- )
-
- def test_subdirectory(self):
- assert self.exclude(['foo/bar']) == convert_paths(
- self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
- )
-
- @pytest.mark.skipif(
- not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows'
- )
- def test_subdirectory_win32_pathsep(self):
- assert self.exclude(['foo\\bar']) == convert_paths(
- self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
- )
-
- def test_double_wildcard(self):
- assert self.exclude(['**/a.py']) == convert_paths(
- self.all_paths - set(
- ['a.py', 'foo/a.py', 'foo/bar/a.py', 'bar/a.py']
- )
- )
-
- assert self.exclude(['foo/**/bar']) == convert_paths(
- self.all_paths - set(['foo/bar', 'foo/bar/a.py'])
- )
-
- def test_single_and_double_wildcard(self):
- assert self.exclude(['**/target/*/*']) == convert_paths(
- self.all_paths - set(
- ['target/subdir/file.txt',
- 'subdir/target/subdir/file.txt',
- 'subdir/subdir2/target/subdir/file.txt']
- )
- )
-
- def test_trailing_double_wildcard(self):
- assert self.exclude(['subdir/**']) == convert_paths(
- self.all_paths - set(
- ['subdir/file.txt',
- 'subdir/target/file.txt',
- 'subdir/target/subdir/file.txt',
- 'subdir/subdir2/file.txt',
- 'subdir/subdir2/target/file.txt',
- 'subdir/subdir2/target/subdir/file.txt',
- 'subdir/target',
- 'subdir/target/subdir',
- 'subdir/subdir2',
- 'subdir/subdir2/target',
- 'subdir/subdir2/target/subdir']
- )
- )
-
- def test_include_wildcard(self):
- base = make_tree(['a'], ['a/b.py'])
- assert exclude_paths(
- base,
- ['*', '!*/b.py']
- ) == convert_paths(['a/b.py'])
-
- def test_last_line_precedence(self):
- base = make_tree(
- [],
- ['garbage.md',
- 'thrash.md',
- 'README.md',
- 'README-bis.md',
- 'README-secret.md'])
- assert exclude_paths(
- base,
- ['*.md', '!README*.md', 'README-secret.md']
- ) == set(['README.md', 'README-bis.md'])
-
- def test_parent_directory(self):
- base = make_tree(
- [],
- ['a.py',
- 'b.py',
- 'c.py'])
- # Dockerignore reference stipulates that absolute paths are
- # equivalent to relative paths, hence /../foo should be
- # equivalent to ../foo. It also stipulates that paths are run
- # through Go's filepath.Clean, which explicitely "replace
- # "/.." by "/" at the beginning of a path".
- assert exclude_paths(
- base,
- ['../a.py', '/../b.py']
- ) == set(['c.py'])
-
-
-class TarTest(unittest.TestCase):
- def test_tar_with_excludes(self):
- dirs = [
- 'foo',
- 'foo/bar',
- 'bar',
- ]
-
- files = [
- 'Dockerfile',
- 'Dockerfile.alt',
- '.dockerignore',
- 'a.py',
- 'a.go',
- 'b.py',
- 'cde.py',
- 'foo/a.py',
- 'foo/b.py',
- 'foo/bar/a.py',
- 'bar/a.py',
- ]
-
- exclude = [
- '*.py',
- '!b.py',
- '!a.go',
- 'foo',
- 'Dockerfile*',
- '.dockerignore',
- ]
-
- expected_names = set([
- 'Dockerfile',
- '.dockerignore',
- 'a.go',
- 'b.py',
- 'bar',
- 'bar/a.py',
- ])
-
- base = make_tree(dirs, files)
- self.addCleanup(shutil.rmtree, base)
-
- with tar(base, exclude=exclude) as archive:
- tar_data = tarfile.open(fileobj=archive)
- assert sorted(tar_data.getnames()) == sorted(expected_names)
-
- def test_tar_with_empty_directory(self):
- base = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base)
- for d in ['foo', 'bar']:
- os.makedirs(os.path.join(base, d))
- with tar(base) as archive:
- tar_data = tarfile.open(fileobj=archive)
- assert sorted(tar_data.getnames()) == ['bar', 'foo']
-
- @pytest.mark.skipif(
- IS_WINDOWS_PLATFORM or os.geteuid() == 0,
- reason='root user always has access ; no chmod on Windows'
- )
- def test_tar_with_inaccessible_file(self):
- base = tempfile.mkdtemp()
- full_path = os.path.join(base, 'foo')
- self.addCleanup(shutil.rmtree, base)
- with open(full_path, 'w') as f:
- f.write('content')
- os.chmod(full_path, 0o222)
- with pytest.raises(IOError) as ei:
- tar(base)
-
- assert 'Can not read file in context: {}'.format(full_path) in (
- ei.exconly()
- )
-
- @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
- def test_tar_with_file_symlinks(self):
- base = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base)
- with open(os.path.join(base, 'foo'), 'w') as f:
- f.write("content")
- os.makedirs(os.path.join(base, 'bar'))
- os.symlink('../foo', os.path.join(base, 'bar/foo'))
- with tar(base) as archive:
- tar_data = tarfile.open(fileobj=archive)
- assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
-
- @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
- def test_tar_with_directory_symlinks(self):
- base = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base)
- for d in ['foo', 'bar']:
- os.makedirs(os.path.join(base, d))
- os.symlink('../foo', os.path.join(base, 'bar/foo'))
- with tar(base) as archive:
- tar_data = tarfile.open(fileobj=archive)
- assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
-
- @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
- def test_tar_with_broken_symlinks(self):
- base = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base)
- for d in ['foo', 'bar']:
- os.makedirs(os.path.join(base, d))
-
- os.symlink('../baz', os.path.join(base, 'bar/foo'))
- with tar(base) as archive:
- tar_data = tarfile.open(fileobj=archive)
- assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo']
-
- @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No UNIX sockets on Win32')
- def test_tar_socket_file(self):
- base = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base)
- for d in ['foo', 'bar']:
- os.makedirs(os.path.join(base, d))
- sock = socket.socket(socket.AF_UNIX)
- self.addCleanup(sock.close)
- sock.bind(os.path.join(base, 'test.sock'))
- with tar(base) as archive:
- tar_data = tarfile.open(fileobj=archive)
- assert sorted(tar_data.getnames()) == ['bar', 'foo']
-
- def tar_test_negative_mtime_bug(self):
- base = tempfile.mkdtemp()
- filename = os.path.join(base, 'th.txt')
- self.addCleanup(shutil.rmtree, base)
- with open(filename, 'w') as f:
- f.write('Invisible Full Moon')
- os.utime(filename, (12345, -3600.0))
- with tar(base) as archive:
- tar_data = tarfile.open(fileobj=archive)
- assert tar_data.getnames() == ['th.txt']
- assert tar_data.getmember('th.txt').mtime == -3600
-
- @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows')
- def test_tar_directory_link(self):
- dirs = ['a', 'b', 'a/c']
- files = ['a/hello.py', 'b/utils.py', 'a/c/descend.py']
- base = make_tree(dirs, files)
- self.addCleanup(shutil.rmtree, base)
- os.symlink(os.path.join(base, 'b'), os.path.join(base, 'a/c/b'))
- with tar(base) as archive:
- tar_data = tarfile.open(fileobj=archive)
- names = tar_data.getnames()
- for member in dirs + files:
- assert member in names
- assert 'a/c/b' in names
- assert 'a/c/b/utils.py' not in names
-
-
class FormatEnvironmentTest(unittest.TestCase):
def test_format_env_binary_unicode_value(self):
env_dict = {