diff options
Diffstat (limited to 'subversion/bindings/swig/python/tests/ra.py')
-rw-r--r-- | subversion/bindings/swig/python/tests/ra.py | 191 |
1 files changed, 93 insertions, 98 deletions
diff --git a/subversion/bindings/swig/python/tests/ra.py b/subversion/bindings/swig/python/tests/ra.py index 47917ca..3d00b2c 100644 --- a/subversion/bindings/swig/python/tests/ra.py +++ b/subversion/bindings/swig/python/tests/ra.py @@ -22,12 +22,7 @@ import unittest, setup_path from svn import core, repos, fs, delta, ra from sys import version_info # For Python version check -if version_info[0] >= 3: - # Python >=3.0 - from io import StringIO -else: - # Python <3.0 - from StringIO import StringIO +from io import BytesIO import utils @@ -58,16 +53,16 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): def test_get_file(self): # Test getting the properties of a file fs_revnum = fs.youngest_rev(self.fs) - rev, properties = ra.get_file(self.ra_ctx, "trunk/README2.txt", + rev, properties = ra.get_file(self.ra_ctx, b"trunk/README2.txt", core.SVN_INVALID_REVNUM, None) self.assertEqual(rev, fs_revnum) - self.assertEqual(properties["svn:mime-type"], "text/plain") + self.assertEqual(properties[b"svn:mime-type"], b"text/plain") # Test getting the contents of a file - filestream = StringIO() - rev, properties = ra.get_file(self.ra_ctx, "trunk/README2.txt", + filestream = BytesIO() + rev, properties = ra.get_file(self.ra_ctx, b"trunk/README2.txt", fs_revnum, filestream) - self.assertEqual("A test.\n", filestream.getvalue()) + self.assertEqual(b"A test.\n", filestream.getvalue()) def test_get_repos_root(self): root = ra.get_repos_root(self.ra_ctx) @@ -84,53 +79,53 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): self.assertEqual(ra_revnum, fs_revnum) def test_get_dir2(self): - (dirents, _, props) = ra.get_dir2(self.ra_ctx, '', 1, core.SVN_DIRENT_KIND) - self.assert_('trunk' in dirents) - self.assert_('branches' in dirents) - self.assert_('tags' in dirents) - self.assertEqual(dirents['trunk'].kind, core.svn_node_dir) - self.assertEqual(dirents['branches'].kind, core.svn_node_dir) - self.assertEqual(dirents['tags'].kind, core.svn_node_dir) - self.assert_(core.SVN_PROP_ENTRY_UUID in props) - self.assert_(core.SVN_PROP_ENTRY_LAST_AUTHOR in props) - - (dirents, _, _) = ra.get_dir2(self.ra_ctx, 'trunk', 1, core.SVN_DIRENT_KIND) + (dirents, _, props) = ra.get_dir2(self.ra_ctx, b'', 1, core.SVN_DIRENT_KIND) + self.assertTrue(b'trunk' in dirents) + self.assertTrue(b'branches' in dirents) + self.assertTrue(b'tags' in dirents) + self.assertEqual(dirents[b'trunk'].kind, core.svn_node_dir) + self.assertEqual(dirents[b'branches'].kind, core.svn_node_dir) + self.assertEqual(dirents[b'tags'].kind, core.svn_node_dir) + self.assertTrue(core.SVN_PROP_ENTRY_UUID in props) + self.assertTrue(core.SVN_PROP_ENTRY_LAST_AUTHOR in props) + + (dirents, _, _) = ra.get_dir2(self.ra_ctx, b'trunk', 1, core.SVN_DIRENT_KIND) self.assertEqual(dirents, {}) - (dirents, _, _) = ra.get_dir2(self.ra_ctx, 'trunk', 10, + (dirents, _, _) = ra.get_dir2(self.ra_ctx, b'trunk', 10, core.SVN_DIRENT_KIND) - self.assert_('README2.txt' in dirents) - self.assertEqual(dirents['README2.txt'].kind, core.svn_node_file) + self.assertTrue(b'README2.txt' in dirents) + self.assertEqual(dirents[b'README2.txt'].kind, core.svn_node_file) def test_commit3(self): commit_info = [] def my_callback(info, pool): commit_info.append(info) - revprops = {"svn:log": "foobar", "testprop": ""} + revprops = {b"svn:log": b"foobar", b"testprop": b""} editor, edit_baton = ra.get_commit_editor3(self.ra_ctx, revprops, my_callback, None, False) root = editor.open_root(edit_baton, 4) self.assertNotEqual(root, None) - child = editor.add_directory("bla3", root, None, 0) + child = editor.add_directory(b"bla3", root, None, 0) self.assertNotEqual(child, None) editor.close_edit(edit_baton) info = commit_info[0] self.assertEqual(info.revision, fs.youngest_rev(self.fs)) - revprops['svn:author'] = info.author - revprops['svn:date'] = info.date + revprops[b'svn:author'] = info.author + revprops[b'svn:date'] = info.date self.assertEqual(ra.rev_proplist(self.ra_ctx, info.revision), revprops) def test_commit2(self): def my_callback(info, pool): self.assertEqual(info.revision, fs.youngest_rev(self.fs)) - editor, edit_baton = ra.get_commit_editor2(self.ra_ctx, "foobar", my_callback, None, False) + editor, edit_baton = ra.get_commit_editor2(self.ra_ctx, b"foobar", my_callback, None, False) root = editor.open_root(edit_baton, 4) self.assertNotEqual(root, None) - child = editor.add_directory("bla", root, None, 0) + child = editor.add_directory(b"bla", root, None, 0) self.assertNotEqual(child, None) editor.close_edit(edit_baton) @@ -138,18 +133,18 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): def my_callback(revision, date, author): self.assertEqual(revision, fs.youngest_rev(self.fs)) - editor, edit_baton = ra.get_commit_editor(self.ra_ctx, "foobar", my_callback, None, False) + editor, edit_baton = ra.get_commit_editor(self.ra_ctx, b"foobar", my_callback, None, False) root = editor.open_root(edit_baton, 4) - child = editor.add_directory("blah", root, None, 0) + child = editor.add_directory(b"blah", root, None, 0) editor.close_edit(edit_baton) def test_delta_driver_commit(self): # Setup paths we'll commit in this test. - to_delete = ['trunk/README.txt', 'trunk/dir1/dir2'] - to_mkdir = ['test_delta_driver_commit.d', 'test_delta_driver_commit2.d'] - to_add = ['test_delta_driver_commit', 'test_delta_driver_commit2'] - to_dir_prop = ['trunk/dir1/dir3', 'test_delta_driver_commit2.d'] - to_file_prop = ['trunk/README2.txt', 'test_delta_driver_commit2'] + to_delete = [b'trunk/README.txt', b'trunk/dir1/dir2'] + to_mkdir = [b'test_delta_driver_commit.d', b'test_delta_driver_commit2.d'] + to_add = [b'test_delta_driver_commit', b'test_delta_driver_commit2'] + to_dir_prop = [b'trunk/dir1/dir3', b'test_delta_driver_commit2.d'] + to_file_prop = [b'trunk/README2.txt', b'test_delta_driver_commit2'] all_paths = {} for i in to_delete + to_mkdir + to_add + to_dir_prop + to_file_prop: all_paths[i] = True @@ -159,12 +154,12 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): commit_info = [] def commit_cb(info, pool): commit_info.append(info) - revprops = {"svn:log": "foobar", "testprop": ""} + revprops = {b"svn:log": b"foobar", b"testprop": b""} (editor, edit_baton) = ra.get_commit_editor3(self.ra_ctx, revprops, commit_cb, None, False) try: def driver_cb(parent, path, pool): - self.assert_(path in all_paths) + self.assertTrue(path in all_paths) dir_baton = file_baton = None if path in to_delete: # Leave dir_baton alone, as it must be None for delete. @@ -182,16 +177,16 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): if dir_baton is None: dir_baton = editor.open_directory(path, parent, revision, pool) editor.change_dir_prop(dir_baton, - 'test_delta_driver_commit', 'foo', pool) + b'test_delta_driver_commit', b'foo', pool) elif path in to_file_prop: if file_baton is None: file_baton = editor.open_file(path, parent, revision, pool) editor.change_file_prop(file_baton, - 'test_delta_driver_commit', 'foo', pool) + b'test_delta_driver_commit', b'foo', pool) if file_baton is not None: editor.close_file(file_baton, None, pool) return dir_baton - delta.path_driver(editor, edit_baton, -1, list(all_paths.keys()), driver_cb) + delta.path_driver(editor, edit_baton, -1, core._as_list(all_paths.keys()), driver_cb) editor.close_edit(edit_baton) except: try: @@ -204,8 +199,8 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): info = commit_info[0] if info.author is not None: - revprops['svn:author'] = info.author - revprops['svn:date'] = info.date + revprops[b'svn:author'] = info.author + revprops[b'svn:date'] = info.date self.assertEqual(ra.rev_proplist(self.ra_ctx, info.revision), revprops) receiver_called = [False] @@ -214,22 +209,22 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): self.assertEqual(revision, info.revision) self.assertEqual(author, info.author) self.assertEqual(date, info.date) - self.assertEqual(message, revprops['svn:log']) - for (path, change) in changed_paths.items(): - path = path.lstrip('/') - self.assert_(path in all_paths) + self.assertEqual(message, revprops[b'svn:log']) + for (path, change) in core._as_list(changed_paths.items()): + path = path.lstrip(b'/') + self.assertTrue(path in all_paths) if path in to_delete: - self.assertEqual(change.action, 'D') + self.assertEqual(change.action, b'D') elif path in to_mkdir or path in to_add: - self.assertEqual(change.action, 'A') + self.assertEqual(change.action, b'A') elif path in to_dir_prop or path in to_file_prop: - self.assertEqual(change.action, 'M') - ra.get_log(self.ra_ctx, [''], info.revision, info.revision, + self.assertEqual(change.action, b'M') + ra.get_log(self.ra_ctx, [b''], info.revision, info.revision, 0, # limit True, # discover_changed_paths True, # strict_node_history receiver) - self.assert_(receiver_called[0]) + self.assertTrue(receiver_called[0]) def test_do_diff2(self): @@ -251,26 +246,26 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): sess_url = ra.get_session_url(self.ra_ctx) try: - ra.reparent(self.ra_ctx, self.repos_uri+"/trunk") + ra.reparent(self.ra_ctx, self.repos_uri+b"/trunk") reporter, reporter_baton = ra.do_diff2(self.ra_ctx, fs_revnum, - "README.txt", 0, 0, 1, + b"README.txt", 0, 0, 1, self.repos_uri - +"/trunk/README.txt", + +b"/trunk/README.txt", e_ptr, e_baton) - reporter.set_path(reporter_baton, "", 0, True, None) + reporter.set_path(reporter_baton, b"", 0, True, None) reporter.finish_report(reporter_baton) finally: ra.reparent(self.ra_ctx, sess_url) - self.assertEqual("A test.\n", editor.textdeltas[0].new_data) + self.assertEqual(b"A test.\n", editor.textdeltas[0].new_data) self.assertEqual(1, len(editor.textdeltas)) def test_get_locations(self): - locations = ra.get_locations(self.ra_ctx, "trunk/README.txt", 2, list(range(1, 5))) + locations = ra.get_locations(self.ra_ctx, b"trunk/README.txt", 2, list(range(1, 5))) self.assertEqual(locations, { - 2: '/trunk/README.txt', - 3: '/trunk/README.txt', - 4: '/trunk/README.txt'}) + 2: b'/trunk/README.txt', + 3: b'/trunk/README.txt', + 4: b'/trunk/README.txt'}) def test_has_capability(self): self.assertEqual(True, ra.has_capability(self.ra_ctx, @@ -278,24 +273,24 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): def test_get_file_revs(self): def rev_handler(path, rev, rev_props, prop_diffs, pool): - self.assert_(rev == 2 or rev == 3) - self.assertEqual(path, "/trunk/README.txt") + self.assertTrue(rev == 2 or rev == 3) + self.assertEqual(path, b"/trunk/README.txt") if rev == 2: self.assertEqual(rev_props, { - 'svn:log': 'Added README.', - 'svn:author': 'john', - 'svn:date': '2005-04-01T13:12:18.216267Z' + b'svn:log': b'Added README.', + b'svn:author': b'john', + b'svn:date': b'2005-04-01T13:12:18.216267Z' }) self.assertEqual(prop_diffs, {}) elif rev == 3: self.assertEqual(rev_props, { - 'svn:log': 'Fixed README.\n', - 'svn:author': 'kate', - 'svn:date': '2005-04-01T13:24:58.234643Z' + b'svn:log': b'Fixed README.\n', + b'svn:author': b'kate', + b'svn:date': b'2005-04-01T13:24:58.234643Z' }) - self.assertEqual(prop_diffs, {'svn:mime-type': 'text/plain', 'svn:eol-style': 'native'}) + self.assertEqual(prop_diffs, {b'svn:mime-type': b'text/plain', b'svn:eol-style': b'native'}) - ra.get_file_revs(self.ra_ctx, "trunk/README.txt", 0, 10, rev_handler) + ra.get_file_revs(self.ra_ctx, b"trunk/README.txt", 0, 10, rev_handler) def test_lock(self): @@ -304,12 +299,12 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): self.errors = 0 def callback(path, do_lock, lock, ra_err, pool): self.calls += 1 - self.assertEqual(path, "trunk/README2.txt") + self.assertEqual(path, b"trunk/README2.txt") if lock: - self.assertEqual(lock.owner, "jrandom") + self.assertEqual(lock.owner, b"jrandom") self.locks += 1 if ra_err: - self.assert_(ra_err.apr_err == core.SVN_ERR_FS_PATH_ALREADY_LOCKED + self.assertTrue(ra_err.apr_err == core.SVN_ERR_FS_PATH_ALREADY_LOCKED or ra_err.apr_err == core.SVN_ERR_FS_NO_SUCH_LOCK) self.errors += 1 @@ -317,31 +312,31 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): self.callbacks.auth_baton = core.svn_auth_open(providers) core.svn_auth_set_parameter(self.callbacks.auth_baton, core.SVN_AUTH_PARAM_DEFAULT_USERNAME, - "jrandom") + b"jrandom") self.ra_ctx = ra.open2(self.repos_uri, self.callbacks, {}) rev = fs.youngest_rev(self.fs) - ra.lock(self.ra_ctx, {"trunk/README2.txt":rev}, "sleutel", False, callback) + ra.lock(self.ra_ctx, {b"trunk/README2.txt":rev}, b"sleutel", False, callback) self.assertEqual(self.calls, 1) self.assertEqual(self.locks, 1) self.assertEqual(self.errors, 0) self.calls = 0 self.locks = 0 - ra.lock(self.ra_ctx, {"trunk/README2.txt":rev}, "sleutel", False, callback) + ra.lock(self.ra_ctx, {b"trunk/README2.txt":rev}, b"sleutel", False, callback) self.assertEqual(self.calls, 1) self.assertEqual(self.locks, 0) self.assertEqual(self.errors, 1) self.calls = 0 self.errors = 0 - the_lock = fs.get_lock(self.fs, "/trunk/README2.txt") - ra.unlock(self.ra_ctx, {"trunk/README2.txt":the_lock.token}, False, callback) + the_lock = fs.get_lock(self.fs, b"/trunk/README2.txt") + ra.unlock(self.ra_ctx, {b"trunk/README2.txt":the_lock.token}, False, callback) self.assertEqual(self.calls, 1) self.assertEqual(self.locks, 0) self.assertEqual(self.errors, 0) self.calls = 0 - ra.unlock(self.ra_ctx, {"trunk/README2.txt":the_lock.token}, False, callback) + ra.unlock(self.ra_ctx, {b"trunk/README2.txt":the_lock.token}, False, callback) self.assertEqual(self.calls, 1) self.assertEqual(self.locks, 0) self.assertEqual(self.errors, 1) @@ -351,16 +346,16 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): self.test_commit3() rev = fs.youngest_rev(self.fs) revprops = ra.rev_proplist(self.ra_ctx, rev) - self.assert_("svn:log" in revprops) - self.assert_("testprop" in revprops) + self.assertTrue(b"svn:log" in revprops) + self.assertTrue(b"testprop" in revprops) def receiver(log_entry, pool): called[0] = True self.assertEqual(log_entry.revision, rev) if discover_changed_paths: - self.assertEqual(list(log_entry.changed_paths.keys()), ['/bla3']) - changed_path = log_entry.changed_paths['/bla3'] - self.assert_(changed_path.action in ['A', 'D', 'R', 'M']) + self.assertEqual(core._as_list(log_entry.changed_paths.keys()), [b'/bla3']) + changed_path = log_entry.changed_paths[b'/bla3'] + self.assertTrue(changed_path.action in [b'A', b'D', b'R', b'M']) self.assertEqual(changed_path.copyfrom_path, None) self.assertEqual(changed_path.copyfrom_rev, -1) else: @@ -368,7 +363,7 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): if log_revprops is None: self.assertEqual(log_entry.revprops, revprops) elif len(log_revprops) == 0: - self.assert_(log_entry.revprops == None or len(log_entry.revprops) == 0) + self.assertTrue(log_entry.revprops == None or len(log_entry.revprops) == 0) else: revprop_names = sorted(log_entry.revprops.keys()) log_revprops.sort() @@ -381,9 +376,9 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): for log_revprops in ( # Retrieve the standard three. - ["svn:author", "svn:date", "svn:log"], + [b"svn:author", b"svn:date", b"svn:log"], # Retrieve just testprop. - ["testprop"], + [b"testprop"], # Retrieve all. None, # Retrieve none. @@ -391,14 +386,14 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): ): for discover_changed_paths in [True, False]: called = [False] - ra.get_log2(self.ra_ctx, [""], + ra.get_log2(self.ra_ctx, [b""], rev, rev, # start, end 1, # limit discover_changed_paths, True, # strict_node_history False, # include_merged_revisions log_revprops, receiver) - self.assert_(called[0]) + self.assertTrue(called[0]) def test_update(self): class TestEditor(delta.Editor): @@ -408,23 +403,23 @@ class SubversionRepositoryAccessTestCase(unittest.TestCase): e_ptr, e_baton = delta.make_editor(editor) - reporter, reporter_baton = ra.do_update(self.ra_ctx, 10, "", True, e_ptr, e_baton) + reporter, reporter_baton = ra.do_update(self.ra_ctx, 10, b"", True, e_ptr, e_baton) - reporter.set_path(reporter_baton, "", 0, True, None) + reporter.set_path(reporter_baton, b"", 0, True, None) reporter.finish_report(reporter_baton) def test_namestring(self): # Only ra-{svn,serf} support this right now. uri = self.repos_uri - if uri.startswith('http') or uri.startswith('svn'): + if uri.startswith(b'http') or uri.startswith(b'svn'): called = [False] def cb(pool): called[0] = True - return 'namestring_test' + return b'namestring_test' self.callbacks.get_client_string = cb - ra.stat(self.ra_ctx, "", 1) - self.assert_(called[0]) + ra.stat(self.ra_ctx, b"", 1) + self.assertTrue(called[0]) def suite(): return unittest.defaultTestLoader.loadTestsFromTestCase( |