diff options
Diffstat (limited to 'tests/unit')
-rw-r--r-- | tests/unit/api_container_test.py | 4 | ||||
-rw-r--r-- | tests/unit/api_test.py | 60 | ||||
-rw-r--r-- | tests/unit/auth_test.py | 66 | ||||
-rw-r--r-- | tests/unit/fake_api_client.py | 16 | ||||
-rw-r--r-- | tests/unit/models_containers_test.py | 12 | ||||
-rw-r--r-- | tests/unit/utils_build_test.py | 493 | ||||
-rw-r--r-- | tests/unit/utils_test.py | 480 |
7 files changed, 634 insertions, 497 deletions
diff --git a/tests/unit/api_container_test.py b/tests/unit/api_container_test.py index c33f129..a7e183c 100644 --- a/tests/unit/api_container_test.py +++ b/tests/unit/api_container_test.py @@ -1335,7 +1335,7 @@ class ContainerTest(BaseAPIClientTest): 'POST', url_prefix + 'containers/3cc2351ab11b/restart', params={'t': 2}, - timeout=DEFAULT_TIMEOUT_SECONDS + timeout=(DEFAULT_TIMEOUT_SECONDS + 2) ) def test_restart_container_with_dict_instead_of_id(self): @@ -1345,7 +1345,7 @@ class ContainerTest(BaseAPIClientTest): 'POST', url_prefix + 'containers/3cc2351ab11b/restart', params={'t': 2}, - timeout=DEFAULT_TIMEOUT_SECONDS + timeout=(DEFAULT_TIMEOUT_SECONDS + 2) ) def test_remove_container(self): diff --git a/tests/unit/api_test.py b/tests/unit/api_test.py index 46cbd68..af2bb1c 100644 --- a/tests/unit/api_test.py +++ b/tests/unit/api_test.py @@ -44,7 +44,7 @@ def response(status_code=200, content='', headers=None, reason=None, elapsed=0, return res -def fake_resolve_authconfig(authconfig, registry=None): +def fake_resolve_authconfig(authconfig, registry=None, *args, **kwargs): return None @@ -365,7 +365,7 @@ class DockerApiTest(BaseAPIClientTest): assert result == content -class StreamTest(unittest.TestCase): +class UnixSocketStreamTest(unittest.TestCase): def setUp(self): socket_dir = tempfile.mkdtemp() self.build_context = tempfile.mkdtemp() @@ -462,7 +462,61 @@ class StreamTest(unittest.TestCase): raise e assert list(stream) == [ - str(i).encode() for i in range(50)] + str(i).encode() for i in range(50) + ] + + +class TCPSocketStreamTest(unittest.TestCase): + text_data = b''' + Now, those children out there, they're jumping through the + flames in the hope that the god of the fire will make them fruitful. + Really, you can't blame them. After all, what girl would not prefer the + child of a god to that of some acne-scarred artisan? + ''' + + def setUp(self): + + self.server = six.moves.socketserver.ThreadingTCPServer( + ('', 0), self.get_handler_class() + ) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.setDaemon(True) + self.thread.start() + self.address = 'http://{}:{}'.format( + socket.gethostname(), self.server.server_address[1] + ) + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + self.thread.join() + + def get_handler_class(self): + text_data = self.text_data + + class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object): + def do_POST(self): + self.send_response(101) + self.send_header( + 'Content-Type', 'application/vnd.docker.raw-stream' + ) + self.send_header('Connection', 'Upgrade') + self.send_header('Upgrade', 'tcp') + self.end_headers() + self.wfile.flush() + time.sleep(0.2) + self.wfile.write(text_data) + self.wfile.flush() + + return Handler + + def test_read_from_socket(self): + with APIClient(base_url=self.address) as client: + resp = client._post(client._url('/dummy'), stream=True) + data = client._read_from_socket(resp, stream=True, tty=True) + results = b''.join(data) + + assert results == self.text_data class UserAgentTest(unittest.TestCase): diff --git a/tests/unit/auth_test.py b/tests/unit/auth_test.py index ee32ca0..947d680 100644 --- a/tests/unit/auth_test.py +++ b/tests/unit/auth_test.py @@ -282,22 +282,64 @@ class LoadConfigTest(unittest.TestCase): cfg = auth.load_config(folder) assert cfg is not None - def test_load_config(self): + def test_load_legacy_config(self): folder = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, folder) - dockercfg_path = os.path.join(folder, '.dockercfg') - with open(dockercfg_path, 'w') as f: - auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + cfg_path = os.path.join(folder, '.dockercfg') + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + with open(cfg_path, 'w') as f: f.write('auth = {0}\n'.format(auth_)) f.write('email = sakuya@scarlet.net') - cfg = auth.load_config(dockercfg_path) - assert auth.INDEX_NAME in cfg - assert cfg[auth.INDEX_NAME] is not None - cfg = cfg[auth.INDEX_NAME] + + cfg = auth.load_config(cfg_path) + assert auth.resolve_authconfig(cfg) is not None + assert cfg['auths'][auth.INDEX_NAME] is not None + cfg = cfg['auths'][auth.INDEX_NAME] assert cfg['username'] == 'sakuya' assert cfg['password'] == 'izayoi' assert cfg['email'] == 'sakuya@scarlet.net' - assert cfg.get('auth') is None + assert cfg.get('Auth') is None + + def test_load_json_config(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + cfg_path = os.path.join(folder, '.dockercfg') + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + email = 'sakuya@scarlet.net' + with open(cfg_path, 'w') as f: + json.dump( + {auth.INDEX_URL: {'auth': auth_, 'email': email}}, f + ) + cfg = auth.load_config(cfg_path) + assert auth.resolve_authconfig(cfg) is not None + assert cfg['auths'][auth.INDEX_URL] is not None + cfg = cfg['auths'][auth.INDEX_URL] + assert cfg['username'] == 'sakuya' + assert cfg['password'] == 'izayoi' + assert cfg['email'] == email + assert cfg.get('Auth') is None + + def test_load_modern_json_config(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + cfg_path = os.path.join(folder, 'config.json') + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + email = 'sakuya@scarlet.net' + with open(cfg_path, 'w') as f: + json.dump({ + 'auths': { + auth.INDEX_URL: { + 'auth': auth_, 'email': email + } + } + }, f) + cfg = auth.load_config(cfg_path) + assert auth.resolve_authconfig(cfg) is not None + assert cfg['auths'][auth.INDEX_URL] is not None + cfg = cfg['auths'][auth.INDEX_URL] + assert cfg['username'] == 'sakuya' + assert cfg['password'] == 'izayoi' + assert cfg['email'] == email def test_load_config_with_random_name(self): folder = tempfile.mkdtemp() @@ -318,7 +360,7 @@ class LoadConfigTest(unittest.TestCase): with open(dockercfg_path, 'w') as f: json.dump(config, f) - cfg = auth.load_config(dockercfg_path) + cfg = auth.load_config(dockercfg_path)['auths'] assert registry in cfg assert cfg[registry] is not None cfg = cfg[registry] @@ -345,7 +387,7 @@ class LoadConfigTest(unittest.TestCase): json.dump(config, f) with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): - cfg = auth.load_config(None) + cfg = auth.load_config(None)['auths'] assert registry in cfg assert cfg[registry] is not None cfg = cfg[registry] @@ -422,7 +464,7 @@ class LoadConfigTest(unittest.TestCase): json.dump(config, f) cfg = auth.load_config(dockercfg_path) - assert cfg == {} + assert cfg == {'auths': {}} def test_load_config_invalid_auth_dict(self): folder = tempfile.mkdtemp() diff --git a/tests/unit/fake_api_client.py b/tests/unit/fake_api_client.py index 15b60ea..2147bfd 100644 --- a/tests/unit/fake_api_client.py +++ b/tests/unit/fake_api_client.py @@ -20,15 +20,18 @@ class CopyReturnMagicMock(mock.MagicMock): return ret -def make_fake_api_client(): +def make_fake_api_client(overrides=None): """ Returns non-complete fake APIClient. This returns most of the default cases correctly, but most arguments that change behaviour will not work. """ + + if overrides is None: + overrides = {} api_client = docker.APIClient() - mock_client = CopyReturnMagicMock(**{ + mock_attrs = { 'build.return_value': fake_api.FAKE_IMAGE_ID, 'commit.return_value': fake_api.post_fake_commit()[1], 'containers.return_value': fake_api.get_fake_containers()[1], @@ -47,15 +50,18 @@ def make_fake_api_client(): 'networks.return_value': fake_api.get_fake_network_list()[1], 'start.return_value': None, 'wait.return_value': {'StatusCode': 0}, - }) + } + mock_attrs.update(overrides) + mock_client = CopyReturnMagicMock(**mock_attrs) + mock_client._version = docker.constants.DEFAULT_DOCKER_API_VERSION return mock_client -def make_fake_client(): +def make_fake_client(overrides=None): """ Returns a Client with a fake APIClient. """ client = docker.DockerClient() - client.api = make_fake_api_client() + client.api = make_fake_api_client(overrides) return client diff --git a/tests/unit/models_containers_test.py b/tests/unit/models_containers_test.py index 2b0b499..48a5288 100644 --- a/tests/unit/models_containers_test.py +++ b/tests/unit/models_containers_test.py @@ -359,6 +359,18 @@ class ContainerCollectionTest(unittest.TestCase): assert isinstance(containers[0], Container) assert containers[0].id == FAKE_CONTAINER_ID + def test_list_ignore_removed(self): + def side_effect(*args, **kwargs): + raise docker.errors.NotFound('Container not found') + client = make_fake_client({ + 'inspect_container.side_effect': side_effect + }) + + with pytest.raises(docker.errors.NotFound): + client.containers.list(all=True, ignore_removed=False) + + assert client.containers.list(all=True, ignore_removed=True) == [] + class ContainerTest(unittest.TestCase): def test_name(self): diff --git a/tests/unit/utils_build_test.py b/tests/unit/utils_build_test.py new file mode 100644 index 0000000..012f15b --- /dev/null +++ b/tests/unit/utils_build_test.py @@ -0,0 +1,493 @@ +# -*- coding: utf-8 -*- + +import os +import os.path +import shutil +import socket +import tarfile +import tempfile +import unittest + + +from docker.constants import IS_WINDOWS_PLATFORM +from docker.utils import exclude_paths, tar + +import pytest + +from ..helpers import make_tree + + +def convert_paths(collection): + return set(map(convert_path, collection)) + + +def convert_path(path): + return path.replace('/', os.path.sep) + + +class ExcludePathsTest(unittest.TestCase): + dirs = [ + 'foo', + 'foo/bar', + 'bar', + 'target', + 'target/subdir', + 'subdir', + 'subdir/target', + 'subdir/target/subdir', + 'subdir/subdir2', + 'subdir/subdir2/target', + 'subdir/subdir2/target/subdir' + ] + + files = [ + 'Dockerfile', + 'Dockerfile.alt', + '.dockerignore', + 'a.py', + 'a.go', + 'b.py', + 'cde.py', + 'foo/a.py', + 'foo/b.py', + 'foo/bar/a.py', + 'bar/a.py', + 'foo/Dockerfile3', + 'target/file.txt', + 'target/subdir/file.txt', + 'subdir/file.txt', + 'subdir/target/file.txt', + 'subdir/target/subdir/file.txt', + 'subdir/subdir2/file.txt', + 'subdir/subdir2/target/file.txt', + 'subdir/subdir2/target/subdir/file.txt', + ] + + all_paths = set(dirs + files) + + def setUp(self): + self.base = make_tree(self.dirs, self.files) + + def tearDown(self): + shutil.rmtree(self.base) + + def exclude(self, patterns, dockerfile=None): + return set(exclude_paths(self.base, patterns, dockerfile=dockerfile)) + + def test_no_excludes(self): + assert self.exclude(['']) == convert_paths(self.all_paths) + + def test_no_dupes(self): + paths = exclude_paths(self.base, ['!a.py']) + assert sorted(paths) == sorted(set(paths)) + + def test_wildcard_exclude(self): + assert self.exclude(['*']) == set(['Dockerfile', '.dockerignore']) + + def test_exclude_dockerfile_dockerignore(self): + """ + Even if the .dockerignore file explicitly says to exclude + Dockerfile and/or .dockerignore, don't exclude them from + the actual tar file. + """ + assert self.exclude(['Dockerfile', '.dockerignore']) == convert_paths( + self.all_paths + ) + + def test_exclude_custom_dockerfile(self): + """ + If we're using a custom Dockerfile, make sure that's not + excluded. + """ + assert self.exclude(['*'], dockerfile='Dockerfile.alt') == set( + ['Dockerfile.alt', '.dockerignore'] + ) + + assert self.exclude( + ['*'], dockerfile='foo/Dockerfile3' + ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore'])) + + # https://github.com/docker/docker-py/issues/1956 + assert self.exclude( + ['*'], dockerfile='./foo/Dockerfile3' + ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore'])) + + def test_exclude_dockerfile_child(self): + includes = self.exclude(['foo/'], dockerfile='foo/Dockerfile3') + assert convert_path('foo/Dockerfile3') in includes + assert convert_path('foo/a.py') not in includes + + def test_single_filename(self): + assert self.exclude(['a.py']) == convert_paths( + self.all_paths - set(['a.py']) + ) + + def test_single_filename_leading_dot_slash(self): + assert self.exclude(['./a.py']) == convert_paths( + self.all_paths - set(['a.py']) + ) + + # As odd as it sounds, a filename pattern with a trailing slash on the + # end *will* result in that file being excluded. + def test_single_filename_trailing_slash(self): + assert self.exclude(['a.py/']) == convert_paths( + self.all_paths - set(['a.py']) + ) + + def test_wildcard_filename_start(self): + assert self.exclude(['*.py']) == convert_paths( + self.all_paths - set(['a.py', 'b.py', 'cde.py']) + ) + + def test_wildcard_with_exception(self): + assert self.exclude(['*.py', '!b.py']) == convert_paths( + self.all_paths - set(['a.py', 'cde.py']) + ) + + def test_wildcard_with_wildcard_exception(self): + assert self.exclude(['*.*', '!*.go']) == convert_paths( + self.all_paths - set([ + 'a.py', 'b.py', 'cde.py', 'Dockerfile.alt', + ]) + ) + + def test_wildcard_filename_end(self): + assert self.exclude(['a.*']) == convert_paths( + self.all_paths - set(['a.py', 'a.go']) + ) + + def test_question_mark(self): + assert self.exclude(['?.py']) == convert_paths( + self.all_paths - set(['a.py', 'b.py']) + ) + + def test_single_subdir_single_filename(self): + assert self.exclude(['foo/a.py']) == convert_paths( + self.all_paths - set(['foo/a.py']) + ) + + def test_single_subdir_single_filename_leading_slash(self): + assert self.exclude(['/foo/a.py']) == convert_paths( + self.all_paths - set(['foo/a.py']) + ) + + def test_exclude_include_absolute_path(self): + base = make_tree([], ['a.py', 'b.py']) + assert exclude_paths( + base, + ['/*', '!/*.py'] + ) == set(['a.py', 'b.py']) + + def test_single_subdir_with_path_traversal(self): + assert self.exclude(['foo/whoops/../a.py']) == convert_paths( + self.all_paths - set(['foo/a.py']) + ) + + def test_single_subdir_wildcard_filename(self): + assert self.exclude(['foo/*.py']) == convert_paths( + self.all_paths - set(['foo/a.py', 'foo/b.py']) + ) + + def test_wildcard_subdir_single_filename(self): + assert self.exclude(['*/a.py']) == convert_paths( + self.all_paths - set(['foo/a.py', 'bar/a.py']) + ) + + def test_wildcard_subdir_wildcard_filename(self): + assert self.exclude(['*/*.py']) == convert_paths( + self.all_paths - set(['foo/a.py', 'foo/b.py', 'bar/a.py']) + ) + + def test_directory(self): + assert self.exclude(['foo']) == convert_paths( + self.all_paths - set([ + 'foo', 'foo/a.py', 'foo/b.py', 'foo/bar', 'foo/bar/a.py', + 'foo/Dockerfile3' + ]) + ) + + def test_directory_with_trailing_slash(self): + assert self.exclude(['foo']) == convert_paths( + self.all_paths - set([ + 'foo', 'foo/a.py', 'foo/b.py', + 'foo/bar', 'foo/bar/a.py', 'foo/Dockerfile3' + ]) + ) + + def test_directory_with_single_exception(self): + assert self.exclude(['foo', '!foo/bar/a.py']) == convert_paths( + self.all_paths - set([ + 'foo/a.py', 'foo/b.py', 'foo', 'foo/bar', + 'foo/Dockerfile3' + ]) + ) + + def test_directory_with_subdir_exception(self): + assert self.exclude(['foo', '!foo/bar']) == convert_paths( + self.all_paths - set([ + 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3' + ]) + ) + + @pytest.mark.skipif( + not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows' + ) + def test_directory_with_subdir_exception_win32_pathsep(self): + assert self.exclude(['foo', '!foo\\bar']) == convert_paths( + self.all_paths - set([ + 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3' + ]) + ) + + def test_directory_with_wildcard_exception(self): + assert self.exclude(['foo', '!foo/*.py']) == convert_paths( + self.all_paths - set([ + 'foo/bar', 'foo/bar/a.py', 'foo', 'foo/Dockerfile3' + ]) + ) + + def test_subdirectory(self): + assert self.exclude(['foo/bar']) == convert_paths( + self.all_paths - set(['foo/bar', 'foo/bar/a.py']) + ) + + @pytest.mark.skipif( + not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows' + ) + def test_subdirectory_win32_pathsep(self): + assert self.exclude(['foo\\bar']) == convert_paths( + self.all_paths - set(['foo/bar', 'foo/bar/a.py']) + ) + + def test_double_wildcard(self): + assert self.exclude(['**/a.py']) == convert_paths( + self.all_paths - set( + ['a.py', 'foo/a.py', 'foo/bar/a.py', 'bar/a.py'] + ) + ) + + assert self.exclude(['foo/**/bar']) == convert_paths( + self.all_paths - set(['foo/bar', 'foo/bar/a.py']) + ) + + def test_single_and_double_wildcard(self): + assert self.exclude(['**/target/*/*']) == convert_paths( + self.all_paths - set( + ['target/subdir/file.txt', + 'subdir/target/subdir/file.txt', + 'subdir/subdir2/target/subdir/file.txt'] + ) + ) + + def test_trailing_double_wildcard(self): + assert self.exclude(['subdir/**']) == convert_paths( + self.all_paths - set( + ['subdir/file.txt', + 'subdir/target/file.txt', + 'subdir/target/subdir/file.txt', + 'subdir/subdir2/file.txt', + 'subdir/subdir2/target/file.txt', + 'subdir/subdir2/target/subdir/file.txt', + 'subdir/target', + 'subdir/target/subdir', + 'subdir/subdir2', + 'subdir/subdir2/target', + 'subdir/subdir2/target/subdir'] + ) + ) + + def test_double_wildcard_with_exception(self): + assert self.exclude(['**', '!bar', '!foo/bar']) == convert_paths( + set([ + 'foo/bar', 'foo/bar/a.py', 'bar', 'bar/a.py', 'Dockerfile', + '.dockerignore', + ]) + ) + + def test_include_wildcard(self): + # This may be surprising but it matches the CLI's behavior + # (tested with 18.05.0-ce on linux) + base = make_tree(['a'], ['a/b.py']) + assert exclude_paths( + base, + ['*', '!*/b.py'] + ) == set() + + def test_last_line_precedence(self): + base = make_tree( + [], + ['garbage.md', + 'trash.md', + 'README.md', + 'README-bis.md', + 'README-secret.md']) + assert exclude_paths( + base, + ['*.md', '!README*.md', 'README-secret.md'] + ) == set(['README.md', 'README-bis.md']) + + def test_parent_directory(self): + base = make_tree( + [], + ['a.py', + 'b.py', + 'c.py']) + # Dockerignore reference stipulates that absolute paths are + # equivalent to relative paths, hence /../foo should be + # equivalent to ../foo. It also stipulates that paths are run + # through Go's filepath.Clean, which explicitely "replace + # "/.." by "/" at the beginning of a path". + assert exclude_paths( + base, + ['../a.py', '/../b.py'] + ) == set(['c.py']) + + +class TarTest(unittest.TestCase): + def test_tar_with_excludes(self): + dirs = [ + 'foo', + 'foo/bar', + 'bar', + ] + + files = [ + 'Dockerfile', + 'Dockerfile.alt', + '.dockerignore', + 'a.py', + 'a.go', + 'b.py', + 'cde.py', + 'foo/a.py', + 'foo/b.py', + 'foo/bar/a.py', + 'bar/a.py', + ] + + exclude = [ + '*.py', + '!b.py', + '!a.go', + 'foo', + 'Dockerfile*', + '.dockerignore', + ] + + expected_names = set([ + 'Dockerfile', + '.dockerignore', + 'a.go', + 'b.py', + 'bar', + 'bar/a.py', + ]) + + base = make_tree(dirs, files) + self.addCleanup(shutil.rmtree, base) + + with tar(base, exclude=exclude) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == sorted(expected_names) + + def test_tar_with_empty_directory(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'foo'] + + @pytest.mark.skipif( + IS_WINDOWS_PLATFORM or os.geteuid() == 0, + reason='root user always has access ; no chmod on Windows' + ) + def test_tar_with_inaccessible_file(self): + base = tempfile.mkdtemp() + full_path = os.path.join(base, 'foo') + self.addCleanup(shutil.rmtree, base) + with open(full_path, 'w') as f: + f.write('content') + os.chmod(full_path, 0o222) + with pytest.raises(IOError) as ei: + tar(base) + + assert 'Can not read file in context: {}'.format(full_path) in ( + ei.exconly() + ) + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') + def test_tar_with_file_symlinks(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + with open(os.path.join(base, 'foo'), 'w') as f: + f.write("content") + os.makedirs(os.path.join(base, 'bar')) + os.symlink('../foo', os.path.join(base, 'bar/foo')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') + def test_tar_with_directory_symlinks(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + os.symlink('../foo', os.path.join(base, 'bar/foo')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') + def test_tar_with_broken_symlinks(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + + os.symlink('../baz', os.path.join(base, 'bar/foo')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No UNIX sockets on Win32') + def test_tar_socket_file(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + sock = socket.socket(socket.AF_UNIX) + self.addCleanup(sock.close) + sock.bind(os.path.join(base, 'test.sock')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == ['bar', 'foo'] + + def tar_test_negative_mtime_bug(self): + base = tempfile.mkdtemp() + filename = os.path.join(base, 'th.txt') + self.addCleanup(shutil.rmtree, base) + with open(filename, 'w') as f: + f.write('Invisible Full Moon') + os.utime(filename, (12345, -3600.0)) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + assert tar_data.getnames() == ['th.txt'] + assert tar_data.getmember('th.txt').mtime == -3600 + + @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') + def test_tar_directory_link(self): + dirs = ['a', 'b', 'a/c'] + files = ['a/hello.py', 'b/utils.py', 'a/c/descend.py'] + base = make_tree(dirs, files) + self.addCleanup(shutil.rmtree, base) + os.symlink(os.path.join(base, 'b'), os.path.join(base, 'a/c/b')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + names = tar_data.getnames() + for member in dirs + files: + assert member in names + assert 'a/c/b' in names + assert 'a/c/b/utils.py' not in names diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py index 00456e8..8880cfe 100644 --- a/tests/unit/utils_test.py +++ b/tests/unit/utils_test.py @@ -5,29 +5,25 @@ import json import os import os.path import shutil -import socket import sys -import tarfile import tempfile import unittest -import pytest -import six from docker.api.client import APIClient -from docker.constants import IS_WINDOWS_PLATFORM from docker.errors import DockerException from docker.utils import ( - parse_repository_tag, parse_host, convert_filters, kwargs_from_env, - parse_bytes, parse_env_file, exclude_paths, convert_volume_binds, - decode_json_header, tar, split_command, parse_devices, update_headers, + convert_filters, convert_volume_binds, decode_json_header, kwargs_from_env, + parse_bytes, parse_devices, parse_env_file, parse_host, + parse_repository_tag, split_command, update_headers, ) from docker.utils.ports import build_port_bindings, split_port from docker.utils.utils import format_environment -from ..helpers import make_tree +import pytest +import six TEST_CERT_DIR = os.path.join( os.path.dirname(__file__), @@ -608,472 +604,6 @@ class PortsTest(unittest.TestCase): assert port_bindings["2000"] == [("127.0.0.1", "2000")] -def convert_paths(collection): - return set(map(convert_path, collection)) - - -def convert_path(path): - return path.replace('/', os.path.sep) - - -class ExcludePathsTest(unittest.TestCase): - dirs = [ - 'foo', - 'foo/bar', - 'bar', - 'target', - 'target/subdir', - 'subdir', - 'subdir/target', - 'subdir/target/subdir', - 'subdir/subdir2', - 'subdir/subdir2/target', - 'subdir/subdir2/target/subdir' - ] - - files = [ - 'Dockerfile', - 'Dockerfile.alt', - '.dockerignore', - 'a.py', - 'a.go', - 'b.py', - 'cde.py', - 'foo/a.py', - 'foo/b.py', - 'foo/bar/a.py', - 'bar/a.py', - 'foo/Dockerfile3', - 'target/file.txt', - 'target/subdir/file.txt', - 'subdir/file.txt', - 'subdir/target/file.txt', - 'subdir/target/subdir/file.txt', - 'subdir/subdir2/file.txt', - 'subdir/subdir2/target/file.txt', - 'subdir/subdir2/target/subdir/file.txt', - ] - - all_paths = set(dirs + files) - - def setUp(self): - self.base = make_tree(self.dirs, self.files) - - def tearDown(self): - shutil.rmtree(self.base) - - def exclude(self, patterns, dockerfile=None): - return set(exclude_paths(self.base, patterns, dockerfile=dockerfile)) - - def test_no_excludes(self): - assert self.exclude(['']) == convert_paths(self.all_paths) - - def test_no_dupes(self): - paths = exclude_paths(self.base, ['!a.py']) - assert sorted(paths) == sorted(set(paths)) - - def test_wildcard_exclude(self): - assert self.exclude(['*']) == set(['Dockerfile', '.dockerignore']) - - def test_exclude_dockerfile_dockerignore(self): - """ - Even if the .dockerignore file explicitly says to exclude - Dockerfile and/or .dockerignore, don't exclude them from - the actual tar file. - """ - assert self.exclude(['Dockerfile', '.dockerignore']) == convert_paths( - self.all_paths - ) - - def test_exclude_custom_dockerfile(self): - """ - If we're using a custom Dockerfile, make sure that's not - excluded. - """ - assert self.exclude(['*'], dockerfile='Dockerfile.alt') == set( - ['Dockerfile.alt', '.dockerignore'] - ) - - assert self.exclude( - ['*'], dockerfile='foo/Dockerfile3' - ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore'])) - - # https://github.com/docker/docker-py/issues/1956 - assert self.exclude( - ['*'], dockerfile='./foo/Dockerfile3' - ) == convert_paths(set(['foo/Dockerfile3', '.dockerignore'])) - - def test_exclude_dockerfile_child(self): - includes = self.exclude(['foo/'], dockerfile='foo/Dockerfile3') - assert convert_path('foo/Dockerfile3') in includes - assert convert_path('foo/a.py') not in includes - - def test_single_filename(self): - assert self.exclude(['a.py']) == convert_paths( - self.all_paths - set(['a.py']) - ) - - def test_single_filename_leading_dot_slash(self): - assert self.exclude(['./a.py']) == convert_paths( - self.all_paths - set(['a.py']) - ) - - # As odd as it sounds, a filename pattern with a trailing slash on the - # end *will* result in that file being excluded. - def test_single_filename_trailing_slash(self): - assert self.exclude(['a.py/']) == convert_paths( - self.all_paths - set(['a.py']) - ) - - def test_wildcard_filename_start(self): - assert self.exclude(['*.py']) == convert_paths( - self.all_paths - set(['a.py', 'b.py', 'cde.py']) - ) - - def test_wildcard_with_exception(self): - assert self.exclude(['*.py', '!b.py']) == convert_paths( - self.all_paths - set(['a.py', 'cde.py']) - ) - - def test_wildcard_with_wildcard_exception(self): - assert self.exclude(['*.*', '!*.go']) == convert_paths( - self.all_paths - set([ - 'a.py', 'b.py', 'cde.py', 'Dockerfile.alt', - ]) - ) - - def test_wildcard_filename_end(self): - assert self.exclude(['a.*']) == convert_paths( - self.all_paths - set(['a.py', 'a.go']) - ) - - def test_question_mark(self): - assert self.exclude(['?.py']) == convert_paths( - self.all_paths - set(['a.py', 'b.py']) - ) - - def test_single_subdir_single_filename(self): - assert self.exclude(['foo/a.py']) == convert_paths( - self.all_paths - set(['foo/a.py']) - ) - - def test_single_subdir_single_filename_leading_slash(self): - assert self.exclude(['/foo/a.py']) == convert_paths( - self.all_paths - set(['foo/a.py']) - ) - - def test_exclude_include_absolute_path(self): - base = make_tree([], ['a.py', 'b.py']) - assert exclude_paths( - base, - ['/*', '!/*.py'] - ) == set(['a.py', 'b.py']) - - def test_single_subdir_with_path_traversal(self): - assert self.exclude(['foo/whoops/../a.py']) == convert_paths( - self.all_paths - set(['foo/a.py']) - ) - - def test_single_subdir_wildcard_filename(self): - assert self.exclude(['foo/*.py']) == convert_paths( - self.all_paths - set(['foo/a.py', 'foo/b.py']) - ) - - def test_wildcard_subdir_single_filename(self): - assert self.exclude(['*/a.py']) == convert_paths( - self.all_paths - set(['foo/a.py', 'bar/a.py']) - ) - - def test_wildcard_subdir_wildcard_filename(self): - assert self.exclude(['*/*.py']) == convert_paths( - self.all_paths - set(['foo/a.py', 'foo/b.py', 'bar/a.py']) - ) - - def test_directory(self): - assert self.exclude(['foo']) == convert_paths( - self.all_paths - set([ - 'foo', 'foo/a.py', 'foo/b.py', 'foo/bar', 'foo/bar/a.py', - 'foo/Dockerfile3' - ]) - ) - - def test_directory_with_trailing_slash(self): - assert self.exclude(['foo']) == convert_paths( - self.all_paths - set([ - 'foo', 'foo/a.py', 'foo/b.py', - 'foo/bar', 'foo/bar/a.py', 'foo/Dockerfile3' - ]) - ) - - def test_directory_with_single_exception(self): - assert self.exclude(['foo', '!foo/bar/a.py']) == convert_paths( - self.all_paths - set([ - 'foo/a.py', 'foo/b.py', 'foo', 'foo/bar', - 'foo/Dockerfile3' - ]) - ) - - def test_directory_with_subdir_exception(self): - assert self.exclude(['foo', '!foo/bar']) == convert_paths( - self.all_paths - set([ - 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3' - ]) - ) - - @pytest.mark.skipif( - not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows' - ) - def test_directory_with_subdir_exception_win32_pathsep(self): - assert self.exclude(['foo', '!foo\\bar']) == convert_paths( - self.all_paths - set([ - 'foo/a.py', 'foo/b.py', 'foo', 'foo/Dockerfile3' - ]) - ) - - def test_directory_with_wildcard_exception(self): - assert self.exclude(['foo', '!foo/*.py']) == convert_paths( - self.all_paths - set([ - 'foo/bar', 'foo/bar/a.py', 'foo', 'foo/Dockerfile3' - ]) - ) - - def test_subdirectory(self): - assert self.exclude(['foo/bar']) == convert_paths( - self.all_paths - set(['foo/bar', 'foo/bar/a.py']) - ) - - @pytest.mark.skipif( - not IS_WINDOWS_PLATFORM, reason='Backslash patterns only on Windows' - ) - def test_subdirectory_win32_pathsep(self): - assert self.exclude(['foo\\bar']) == convert_paths( - self.all_paths - set(['foo/bar', 'foo/bar/a.py']) - ) - - def test_double_wildcard(self): - assert self.exclude(['**/a.py']) == convert_paths( - self.all_paths - set( - ['a.py', 'foo/a.py', 'foo/bar/a.py', 'bar/a.py'] - ) - ) - - assert self.exclude(['foo/**/bar']) == convert_paths( - self.all_paths - set(['foo/bar', 'foo/bar/a.py']) - ) - - def test_single_and_double_wildcard(self): - assert self.exclude(['**/target/*/*']) == convert_paths( - self.all_paths - set( - ['target/subdir/file.txt', - 'subdir/target/subdir/file.txt', - 'subdir/subdir2/target/subdir/file.txt'] - ) - ) - - def test_trailing_double_wildcard(self): - assert self.exclude(['subdir/**']) == convert_paths( - self.all_paths - set( - ['subdir/file.txt', - 'subdir/target/file.txt', - 'subdir/target/subdir/file.txt', - 'subdir/subdir2/file.txt', - 'subdir/subdir2/target/file.txt', - 'subdir/subdir2/target/subdir/file.txt', - 'subdir/target', - 'subdir/target/subdir', - 'subdir/subdir2', - 'subdir/subdir2/target', - 'subdir/subdir2/target/subdir'] - ) - ) - - def test_include_wildcard(self): - base = make_tree(['a'], ['a/b.py']) - assert exclude_paths( - base, - ['*', '!*/b.py'] - ) == convert_paths(['a/b.py']) - - def test_last_line_precedence(self): - base = make_tree( - [], - ['garbage.md', - 'thrash.md', - 'README.md', - 'README-bis.md', - 'README-secret.md']) - assert exclude_paths( - base, - ['*.md', '!README*.md', 'README-secret.md'] - ) == set(['README.md', 'README-bis.md']) - - def test_parent_directory(self): - base = make_tree( - [], - ['a.py', - 'b.py', - 'c.py']) - # Dockerignore reference stipulates that absolute paths are - # equivalent to relative paths, hence /../foo should be - # equivalent to ../foo. It also stipulates that paths are run - # through Go's filepath.Clean, which explicitely "replace - # "/.." by "/" at the beginning of a path". - assert exclude_paths( - base, - ['../a.py', '/../b.py'] - ) == set(['c.py']) - - -class TarTest(unittest.TestCase): - def test_tar_with_excludes(self): - dirs = [ - 'foo', - 'foo/bar', - 'bar', - ] - - files = [ - 'Dockerfile', - 'Dockerfile.alt', - '.dockerignore', - 'a.py', - 'a.go', - 'b.py', - 'cde.py', - 'foo/a.py', - 'foo/b.py', - 'foo/bar/a.py', - 'bar/a.py', - ] - - exclude = [ - '*.py', - '!b.py', - '!a.go', - 'foo', - 'Dockerfile*', - '.dockerignore', - ] - - expected_names = set([ - 'Dockerfile', - '.dockerignore', - 'a.go', - 'b.py', - 'bar', - 'bar/a.py', - ]) - - base = make_tree(dirs, files) - self.addCleanup(shutil.rmtree, base) - - with tar(base, exclude=exclude) as archive: - tar_data = tarfile.open(fileobj=archive) - assert sorted(tar_data.getnames()) == sorted(expected_names) - - def test_tar_with_empty_directory(self): - base = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base) - for d in ['foo', 'bar']: - os.makedirs(os.path.join(base, d)) - with tar(base) as archive: - tar_data = tarfile.open(fileobj=archive) - assert sorted(tar_data.getnames()) == ['bar', 'foo'] - - @pytest.mark.skipif( - IS_WINDOWS_PLATFORM or os.geteuid() == 0, - reason='root user always has access ; no chmod on Windows' - ) - def test_tar_with_inaccessible_file(self): - base = tempfile.mkdtemp() - full_path = os.path.join(base, 'foo') - self.addCleanup(shutil.rmtree, base) - with open(full_path, 'w') as f: - f.write('content') - os.chmod(full_path, 0o222) - with pytest.raises(IOError) as ei: - tar(base) - - assert 'Can not read file in context: {}'.format(full_path) in ( - ei.exconly() - ) - - @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') - def test_tar_with_file_symlinks(self): - base = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base) - with open(os.path.join(base, 'foo'), 'w') as f: - f.write("content") - os.makedirs(os.path.join(base, 'bar')) - os.symlink('../foo', os.path.join(base, 'bar/foo')) - with tar(base) as archive: - tar_data = tarfile.open(fileobj=archive) - assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] - - @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') - def test_tar_with_directory_symlinks(self): - base = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base) - for d in ['foo', 'bar']: - os.makedirs(os.path.join(base, d)) - os.symlink('../foo', os.path.join(base, 'bar/foo')) - with tar(base) as archive: - tar_data = tarfile.open(fileobj=archive) - assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] - - @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') - def test_tar_with_broken_symlinks(self): - base = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base) - for d in ['foo', 'bar']: - os.makedirs(os.path.join(base, d)) - - os.symlink('../baz', os.path.join(base, 'bar/foo')) - with tar(base) as archive: - tar_data = tarfile.open(fileobj=archive) - assert sorted(tar_data.getnames()) == ['bar', 'bar/foo', 'foo'] - - @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No UNIX sockets on Win32') - def test_tar_socket_file(self): - base = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base) - for d in ['foo', 'bar']: - os.makedirs(os.path.join(base, d)) - sock = socket.socket(socket.AF_UNIX) - self.addCleanup(sock.close) - sock.bind(os.path.join(base, 'test.sock')) - with tar(base) as archive: - tar_data = tarfile.open(fileobj=archive) - assert sorted(tar_data.getnames()) == ['bar', 'foo'] - - def tar_test_negative_mtime_bug(self): - base = tempfile.mkdtemp() - filename = os.path.join(base, 'th.txt') - self.addCleanup(shutil.rmtree, base) - with open(filename, 'w') as f: - f.write('Invisible Full Moon') - os.utime(filename, (12345, -3600.0)) - with tar(base) as archive: - tar_data = tarfile.open(fileobj=archive) - assert tar_data.getnames() == ['th.txt'] - assert tar_data.getmember('th.txt').mtime == -3600 - - @pytest.mark.skipif(IS_WINDOWS_PLATFORM, reason='No symlinks on Windows') - def test_tar_directory_link(self): - dirs = ['a', 'b', 'a/c'] - files = ['a/hello.py', 'b/utils.py', 'a/c/descend.py'] - base = make_tree(dirs, files) - self.addCleanup(shutil.rmtree, base) - os.symlink(os.path.join(base, 'b'), os.path.join(base, 'a/c/b')) - with tar(base) as archive: - tar_data = tarfile.open(fileobj=archive) - names = tar_data.getnames() - for member in dirs + files: - assert member in names - assert 'a/c/b' in names - assert 'a/c/b/utils.py' not in names - - class FormatEnvironmentTest(unittest.TestCase): def test_format_env_binary_unicode_value(self): env_dict = { |