From c1b317d2a65ce63720cf3edd488e868532918add Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelmer=20Vernoo=C4=B3?= Date: Sun, 16 Jul 2017 21:31:45 +0000 Subject: Fix style issues. --- subvertpy/__init__.py | 13 ++- subvertpy/delta.py | 14 +-- subvertpy/marshall.py | 17 ++-- subvertpy/properties.py | 43 +++++---- subvertpy/ra.py | 13 +-- subvertpy/ra_svn.py | 178 +++++++++++++++++++++++-------------- subvertpy/server.py | 6 +- subvertpy/tests/__init__.py | 48 +++++----- subvertpy/tests/test_client.py | 19 ++-- subvertpy/tests/test_core.py | 6 +- subvertpy/tests/test_delta.py | 12 +-- subvertpy/tests/test_marshall.py | 12 +-- subvertpy/tests/test_properties.py | 151 +++++++++++++++++++++---------- subvertpy/tests/test_ra.py | 139 +++++++++++++++++++---------- subvertpy/tests/test_repos.py | 31 ++++--- subvertpy/tests/test_server.py | 6 +- subvertpy/tests/test_wc.py | 82 +++++++++-------- 17 files changed, 485 insertions(+), 305 deletions(-) (limited to 'subvertpy') diff --git a/subvertpy/__init__.py b/subvertpy/__init__.py index b588ca8c..6efcea11 100644 --- a/subvertpy/__init__.py +++ b/subvertpy/__init__.py @@ -92,10 +92,10 @@ AUTH_PARAM_DEFAULT_USERNAME = 'svn:auth:username' AUTH_PARAM_DEFAULT_PASSWORD = 'svn:auth:password' SSL_NOTYETVALID = 0x00000001 -SSL_EXPIRED = 0x00000002 -SSL_CNMISMATCH = 0x00000004 -SSL_UNKNOWNCA = 0x00000008 -SSL_OTHER = 0x40000000 +SSL_EXPIRED = 0x00000002 +SSL_CNMISMATCH = 0x00000004 +SSL_UNKNOWNCA = 0x00000008 +SSL_OTHER = 0x40000000 class SubversionException(Exception): @@ -109,7 +109,7 @@ class SubversionException(Exception): def _check_mtime(m): """Check whether a C extension is out of date. - + :param m: Python module that is a C extension """ import os @@ -121,6 +121,7 @@ def _check_mtime(m): return False return True + try: from subvertpy import client, _ra, repos, wc for x in client, _ra, repos, wc: @@ -130,5 +131,3 @@ try: break except ImportError as e: raise ImportError("Unable to load subvertpy extensions: %s" % e) - - diff --git a/subvertpy/delta.py b/subvertpy/delta.py index 9dbf7b58..0e354490 100644 --- a/subvertpy/delta.py +++ b/subvertpy/delta.py @@ -32,6 +32,7 @@ MAX_ENCODED_INT_LEN = 10 DELTA_WINDOW_SIZE = 102400 + def apply_txdelta_window(sbuf, window): """Apply a txdelta window to a buffer. @@ -60,9 +61,10 @@ def apply_txdelta_handler_chunks(source_chunks, target_chunks): :param target_stream: Target stream """ sbuf = bytes().join(source_chunks) + def apply_window(window): if window is None: - return # Last call + return # Last call target_chunks.append(apply_txdelta_window(sbuf, window)) return apply_window @@ -75,7 +77,7 @@ def apply_txdelta_handler(sbuf, target_stream): """ def apply_window(window): if window is None: - return # Last call + return # Last call target_stream.write(apply_txdelta_window(sbuf, window)) return apply_window @@ -136,16 +138,16 @@ def encode_length(len): # Count number of required bytes v = len >> 7 - n = 1; + n = 1 while v > 0: v = v >> 7 - n+=1 + n += 1 assert n <= MAX_ENCODED_INT_LEN ret = bytearray() while n > 0: - n-=1 + n -= 1 if n > 0: cont = 1 else: @@ -211,6 +213,7 @@ def unpack_svndiff_instruction(text): SVNDIFF0_HEADER = b"SVN\0" + def pack_svndiff0_window(window): """Pack an individual window using svndiff0. @@ -273,4 +276,3 @@ def unpack_svndiff0(text): newdata = text[:newdata_len] text = text[newdata_len:] yield (sview_offset, sview_len, tview_len, len(ops), ops, newdata) - diff --git a/subvertpy/marshall.py b/subvertpy/marshall.py index a5a73154..1970da41 100644 --- a/subvertpy/marshall.py +++ b/subvertpy/marshall.py @@ -16,6 +16,7 @@ """Marshalling for the svn_ra protocol.""" + class literal: """A protocol literal.""" @@ -30,10 +31,10 @@ class literal: # 1. Syntactic structure # ---------------------- -# +# # The Subversion protocol is specified in terms of the following # syntactic elements, specified using ABNF [RFC 2234]: -# +# # item = word / number / string / list # word = ALPHA *(ALPHA / DIGIT / "-") space # number = 1*DIGIT space @@ -41,7 +42,8 @@ class literal: # ; digits give the byte count of the *OCTET portion # list = "(" space *item ")" space # space = 1*(SP / LF) -# +# + class MarshallError(Exception): """A Marshall error.""" @@ -53,7 +55,7 @@ class NeedMoreData(MarshallError): def marshall(x): """Marshall a Python data item. - + :param x: Data item :return: encoded byte string """ @@ -69,9 +71,9 @@ def marshall(x): x = x.encode("utf-8") return ("%d:" % len(x)).encode("ascii") + x + b" " elif isinstance(x, bool): - if x == True: + if x is True: return b"true " - elif x == False: + elif x is False: return b"false " raise MarshallError("Unable to marshall type %s" % x) @@ -85,7 +87,7 @@ def unmarshall(x): whitespace = frozenset(b'\n ') if len(x) == 0: raise NeedMoreData("Not enough data") - if x[0:1] == b"(": # list follows + if x[0:1] == b"(": # list follows if len(x) <= 1: raise NeedMoreData("Missing whitespace") if x[1:2] != b" ": @@ -143,4 +145,3 @@ def unmarshall(x): return (x[1:], ret.decode("ascii")) else: raise MarshallError("Unexpected character '%c'" % x[0]) - diff --git a/subvertpy/properties.py b/subvertpy/properties.py index a86c2331..567f60cc 100644 --- a/subvertpy/properties.py +++ b/subvertpy/properties.py @@ -1,5 +1,5 @@ # Copyright (C) 2005-2007 Jelmer Vernooij - + # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or @@ -18,7 +18,10 @@ __author__ = "Jelmer Vernooij " __docformat__ = "restructuredText" -import bisect, calendar, time + +import bisect +import calendar +import time try: import urlparse except ImportError: @@ -38,7 +41,7 @@ def is_valid_property_name(prop): if not prop[0].isalnum() and not prop[0] in ":_": return False for c in prop[1:]: - if not c.isalnum() and not c in "-:._": + if not c.isalnum() and c not in "-:._": return False return True @@ -50,14 +53,15 @@ def time_to_cstring(timestamp): :return: string with date """ tm_usec = timestamp % 1000000 - (tm_year, tm_mon, tm_mday, tm_hour, tm_min, - tm_sec, tm_wday, tm_yday, tm_isdst) = time.gmtime(timestamp / 1000000) - return "%04d-%02d-%02dT%02d:%02d:%02d.%06dZ" % (tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, tm_usec) + (tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, tm_wday, tm_yday, + tm_isdst) = time.gmtime(timestamp / 1000000) + return "%04d-%02d-%02dT%02d:%02d:%02d.%06dZ" % ( + tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, tm_usec) def time_from_cstring(text): """Parse a time from a cstring. - + :param text: Parse text :return: number of microseconds since the start of 1970 """ @@ -71,11 +75,11 @@ def time_from_cstring(text): def parse_externals_description(base_url, val): """Parse an svn:externals property value. - :param base_url: URL on which the property is set. Used for + :param base_url: URL on which the property is set. Used for relative externals. :returns: dictionary with local names as keys, (revnum, url) - as value. revnum is the revision number and is + as value. revnum is the revision number and is set to None if not applicable. """ def is_url(u): @@ -84,24 +88,24 @@ def parse_externals_description(base_url, val): for l in val.splitlines(): if l == "" or l[0] == "#": continue - pts = l.rsplit(None, 3) + pts = l.rsplit(None, 3) if len(pts) == 4: - if pts[0] == "-r": # -r X URL DIR + if pts[0] == "-r": # -r X URL DIR revno = int(pts[1]) path = pts[3] relurl = pts[2] - elif pts[1] == "-r": # DIR -r X URL + elif pts[1] == "-r": # DIR -r X URL revno = int(pts[2]) path = pts[0] relurl = pts[3] else: raise InvalidExternalsDescription() elif len(pts) == 3: - if pts[1].startswith("-r"): # DIR -rX URL + if pts[1].startswith("-r"): # DIR -rX URL revno = int(pts[1][2:]) path = pts[0] relurl = pts[2] - elif pts[0].startswith("-r"): # -rX URL DIR + elif pts[0].startswith("-r"): # -rX URL DIR revno = int(pts[0][2:]) path = pts[2] relurl = pts[1] @@ -118,9 +122,11 @@ def parse_externals_description(base_url, val): else: raise InvalidExternalsDescription() if relurl.startswith("//"): - raise NotImplementedError("Relative to the scheme externals not yet supported") + raise NotImplementedError( + "Relative to the scheme externals not yet supported") if relurl.startswith("^/"): - raise NotImplementedError("Relative to the repository root externals not yet supported") + raise NotImplementedError( + "Relative to the repository root externals not yet supported") ret[path] = (revno, urlparse.urljoin(base_url+"/", relurl)) return ret @@ -274,13 +280,14 @@ PROP_REVISION_AUTHOR = "svn:author" PROP_REVISION_DATE = "svn:date" PROP_REVISION_ORIGINAL_DATE = "svn:original-date" + def diff(current, previous): """Find the differences between two property dictionaries. :param current: Dictionary with current (new) properties :param previous: Dictionary with previous (old) properties - :return: Dictionary that contains an entry for - each property that was changed. Value is a tuple + :return: Dictionary that contains an entry for + each property that was changed. Value is a tuple with the old and the new property value. """ ret = {} diff --git a/subvertpy/ra.py b/subvertpy/ra.py index 9963edda..95011702 100644 --- a/subvertpy/ra.py +++ b/subvertpy/ra.py @@ -17,11 +17,11 @@ __author__ = "Jelmer Vernooij " -from subvertpy import SubversionException, ERR_BAD_URL +from subvertpy import SubversionException, ERR_BAD_URL from subvertpy import _ra -from subvertpy._ra import * -from subvertpy import ra_svn +from subvertpy._ra import * # noqa: F403,F401 +from subvertpy import ra_svn # noqa: F401 try: from urllib2 import splittype @@ -30,14 +30,15 @@ except ImportError: url_handlers = { "svn": _ra.RemoteAccess, -# "svn": ra_svn.Client, + # "svn": ra_svn.Client, "svn+ssh": _ra.RemoteAccess, -# "svn+ssh": ra_svn.Client, + # "svn+ssh": ra_svn.Client, "http": _ra.RemoteAccess, "https": _ra.RemoteAccess, "file": _ra.RemoteAccess, } + def RemoteAccess(url, *args, **kwargs): """Connect to a remote Subversion server @@ -47,6 +48,6 @@ def RemoteAccess(url, *args, **kwargs): if isinstance(url, bytes): url = url.decode("utf-8") (type, opaque) = splittype(url) - if not type in url_handlers: + if type not in url_handlers: raise SubversionException("Unknown URL type '%s'" % type, ERR_BAD_URL) return url_handlers[type](url, *args, **kwargs) diff --git a/subvertpy/ra_svn.py b/subvertpy/ra_svn.py index 7b92bfeb..3e61e283 100644 --- a/subvertpy/ra_svn.py +++ b/subvertpy/ra_svn.py @@ -122,7 +122,7 @@ class SVNConnection(object): except NeedMoreData: newdata = self.recv_fn(1) if newdata != "": - #self.mutter("IN: %r" % newdata) + # self.mutter("IN: %r" % newdata) self.inbuffer += newdata def send_msg(self, data): @@ -158,7 +158,8 @@ def feed_editor(conn, editor): if len(args[3]) == 0: token = tokens[args[1]].add_directory(args[0]) else: - token = tokens[args[1]].add_directory(args[0], args[3][0], args[4][0]) + token = tokens[args[1]].add_directory( + args[0], args[3][0], args[4][0]) tokens[args[2]] = token elif command == "open-dir": tokens[args[2]] = tokens[args[1]].open_directory(args[0], args[3]) @@ -175,15 +176,18 @@ def feed_editor(conn, editor): if len(args[3]) == 0: token = tokens[args[1]].add_file(args[0]) else: - token = tokens[args[1]].add_file(args[0], args[3][0], args[4][0]) + token = tokens[args[1]].add_file( + args[0], args[3][0], args[4][0]) tokens[args[2]] = token elif command == "open-file": tokens[args[2]] = tokens[args[1]].open_file(args[0], args[3]) elif command == "apply-textdelta": if len(args[1]) == 0: - txdelta_handler[args[0]] = tokens[args[0]].apply_textdelta(None) + txdelta_handler[args[0]] = tokens[args[0]].apply_textdelta( + None) else: - txdelta_handler[args[0]] = tokens[args[0]].apply_textdelta(args[1][0]) + txdelta_handler[args[0]] = tokens[args[0]].apply_textdelta( + args[1][0]) diff[args[0]] = "" elif command == "textdelta-chunk": diff[args[0]] += args[1] @@ -220,7 +224,7 @@ class Reporter(object): self.conn = conn self.editor = editor - def set_path(self, path, rev, start_empty=False, lock_token=None, + def set_path(self, path, rev, start_empty=False, lock_token=None, depth=None): args = [path, rev, start_empty] if lock_token is not None: @@ -235,8 +239,8 @@ class Reporter(object): def delete_path(self, path): self.conn.send_msg([literal("delete-path"), [path]]) - def link_path(self, path, url, rev, start_empty=False, lock_token=None, - depth=None): + def link_path(self, path, url, rev, start_empty=False, lock_token=None, + depth=None): args = [path, url, rev, start_empty] if lock_token is not None: args.append([lock_token]) @@ -249,7 +253,7 @@ class Reporter(object): def finish(self): self.conn.send_msg([literal("finish-report"), []]) - auth = self.conn.recv_msg() + self.conn.recv_msg() feed_editor(self.conn, self.editor) self.conn.busy = False @@ -301,18 +305,21 @@ class DirectoryEditor(object): copyfrom_data = [copyfrom_path, copyfrom_rev] else: copyfrom_data = [] - self.conn.send_msg([literal("add-file"), [path, self.id, child, copyfrom_data]]) + self.conn.send_msg([literal("add-file"), + [path, self.id, child, copyfrom_data]]) return FileEditor(self.conn, child) def open_file(self, path, base_revnum): self._is_last_open() child = generate_random_id() - self.conn.send_msg([literal("open-file"), [path, self.id, child, base_revnum]]) + self.conn.send_msg([literal("open-file"), + [path, self.id, child, base_revnum]]) return FileEditor(self.conn, child) def delete_entry(self, path, base_revnum): self._is_last_open() - self.conn.send_msg([literal("delete-entry"), [path, base_revnum, self.id]]) + self.conn.send_msg([literal("delete-entry"), + [path, base_revnum, self.id]]) def add_directory(self, path, copyfrom_path=None, copyfrom_rev=-1): self._is_last_open() @@ -321,13 +328,15 @@ class DirectoryEditor(object): copyfrom_data = [copyfrom_path, copyfrom_rev] else: copyfrom_data = [] - self.conn.send_msg([literal("add-dir"), [path, self.id, child, copyfrom_data]]) + self.conn.send_msg([literal("add-dir"), + [path, self.id, child, copyfrom_data]]) return DirectoryEditor(self.conn, child) def open_directory(self, path, base_revnum): self._is_last_open() child = generate_random_id() - self.conn.send_msg([literal("open-dir"), [path, self.id, child, base_revnum]]) + self.conn.send_msg([literal("open-dir"), + [path, self.id, child, base_revnum]]) return DirectoryEditor(self.conn, child) def change_prop(self, name, value): @@ -336,7 +345,8 @@ class DirectoryEditor(object): value = [] else: value = [value] - self.conn.send_msg([literal("change-dir-prop"), [self.id, name, value]]) + self.conn.send_msg([literal("change-dir-prop"), + [self.id, name, value]]) def _is_last_open(self): assert self.conn._open_ids[-1] == self.id @@ -374,13 +384,17 @@ class FileEditor(object): base_check = [] else: base_check = [base_checksum] - self.conn.send_msg([literal("apply-textdelta"), [self.id, base_check]]) - self.conn.send_msg([literal("textdelta-chunk"), [self.id, SVNDIFF0_HEADER]]) + self.conn.send_msg([literal("apply-textdelta"), + [self.id, base_check]]) + self.conn.send_msg([literal("textdelta-chunk"), + [self.id, SVNDIFF0_HEADER]]) + def send_textdelta(delta): if delta is None: self.conn.send_msg([literal("textdelta-end"), [self.id]]) else: - self.conn.send_msg([literal("textdelta-chunk"), [self.id, pack_svndiff0_window(delta)]]) + self.conn.send_msg([literal("textdelta-chunk"), + [self.id, pack_svndiff0_window(delta)]]) return send_textdelta def change_prop(self, name, value): @@ -389,11 +403,12 @@ class FileEditor(object): value = [] else: value = [value] - self.conn.send_msg([literal("change-file-prop"), [self.id, name, value]]) + self.conn.send_msg([literal("change-file-prop"), + [self.id, name, value]]) def mark_busy(unbound): - + def convert(self, *args, **kwargs): self.busy = True try: @@ -406,6 +421,7 @@ def mark_busy(unbound): convert.__name__ = unbound.__name__ return convert + def unmarshall_dirent(d): ret = { "name": d[0], @@ -423,7 +439,7 @@ def unmarshall_dirent(d): class SVNClient(SVNConnection): - def __init__(self, url, progress_cb=None, auth=None, config=None, + def __init__(self, url, progress_cb=None, auth=None, config=None, client_string_func=None, open_tmp_file_func=None): self.url = url (type, opaque) = urlparse.splittype(url) @@ -439,12 +455,19 @@ class SVNClient(SVNConnection): else: (recv_func, send_func) = self._connect_ssh(host) super(SVNClient, self).__init__(recv_func, send_func) - (min_version, max_version, _, self._server_capabilities) = self._recv_greeting() - self.send_msg([max_version, [literal(x) for x in CAPABILITIES if x in self._server_capabilities], self.url]) + (min_version, max_version, _, self._server_capabilities) = ( + self._recv_greeting()) + self.send_msg( + [max_version, + [literal(x) for x in CAPABILITIES + if x in self._server_capabilities], + self.url]) (self._server_mechanisms, mech_arg) = self._unpack() if self._server_mechanisms != []: # FIXME: Support other mechanisms as well - self.send_msg([literal("ANONYMOUS"), [base64.b64encode("anonymous@%s" % socket.gethostname())]]) + self.send_msg([literal("ANONYMOUS"), + [base64.b64encode( + "anonymous@%s" % socket.gethostname())]]) self.recv_msg() msg = self._unpack() if len(msg) > 2: @@ -475,9 +498,11 @@ class SVNClient(SVNConnection): def _connect(self, host): (host, port) = urlparse.splitnport(host, SVN_PORT) - sockaddrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, + sockaddrs = socket.getaddrinfo( + host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 0) self._socket = None + err = RuntimeError('no addresses for %s:%s' % (host, port)) for (family, socktype, proto, canonname, sockaddr) in sockaddrs: try: self._socket = socket.socket(family, socktype, proto) @@ -500,7 +525,8 @@ class SVNClient(SVNConnection): else: password = None (host, port) = urlparse.splitnport(host, 22) - self._tunnel = get_ssh_vendor().connect_ssh(user, password, host, port, ["svnserve", "-t"]) + self._tunnel = get_ssh_vendor().connect_ssh( + user, password, host, port, ["svnserve", "-t"]) return (self._tunnel.recv, self._tunnel.send) def get_file_revs(self, path, start, end, file_rev_handler): @@ -508,7 +534,8 @@ class SVNClient(SVNConnection): @mark_busy def get_locations(self, path, peg_revision, location_revisions): - self.send_msg([literal("get-locations"), [path, peg_revision, location_revisions]]) + self.send_msg([literal("get-locations"), [path, peg_revision, + location_revisions]]) self._recv_ack() ret = {} while True: @@ -530,10 +557,11 @@ class SVNClient(SVNConnection): def unlock(self, path_tokens, break_lock, lock_func): raise NotImplementedError(self.unlock) - def mergeinfo(self, paths, revision=-1, inherit=None, include_descendants=False): + def mergeinfo(self, paths, revision=-1, inherit=None, + include_descendants=False): raise NotImplementedError(self.mergeinfo) - def location_segments(self, path, start_revision, end_revision, + def location_segments(self, path, start_revision, end_revision, include_merged_revisions=False): args = [path] if start_revision is None or start_revision == -1: @@ -571,7 +599,8 @@ class SVNClient(SVNConnection): self.send_msg([literal("check-path"), args]) self._recv_ack() ret = self._unpack()[0] - return {"dir": NODE_DIR, "file": NODE_FILE, "unknown": NODE_UNKNOWN, "none": NODE_NONE}[ret] + return {"dir": NODE_DIR, "file": NODE_FILE, "unknown": NODE_UNKNOWN, + "none": NODE_NONE}[ret] def get_lock(self, path): self.send_msg([literal("get-lock"), [path]]) @@ -583,13 +612,14 @@ class SVNClient(SVNConnection): return ret[0] @mark_busy - def get_dir(self, path, revision=-1, dirent_fields=0, want_props=True, want_contents=True): + def get_dir(self, path, revision=-1, dirent_fields=0, want_props=True, + want_contents=True): args = [path] if revision is None or revision == -1: args.append([]) else: args.append([revision]) - + args += [want_props, want_contents] fields = [] @@ -646,7 +676,7 @@ class SVNClient(SVNConnection): self._recv_ack() self._unparse() - def get_commit_editor(self, revprops, callback=None, lock_tokens=None, + def get_commit_editor(self, revprops, callback=None, lock_tokens=None, keep_locks=False): args = [revprops[properties.PROP_REVISION_LOG]] if lock_tokens is not None: @@ -675,16 +705,19 @@ class SVNClient(SVNConnection): return ret[0] @mark_busy - def replay(self, revision, low_water_mark, update_editor, send_deltas=True): - self.send_msg([literal("replay"), [revision, low_water_mark, send_deltas]]) + def replay(self, revision, low_water_mark, update_editor, + send_deltas=True): + self.send_msg([literal("replay"), [revision, low_water_mark, + send_deltas]]) self._recv_ack() feed_editor(self, update_editor) self._unpack() @mark_busy - def replay_range(self, start_revision, end_revision, low_water_mark, cbs, + def replay_range(self, start_revision, end_revision, low_water_mark, cbs, send_deltas=True): - self.send_msg([literal("replay-range"), [start_revision, end_revision, low_water_mark, send_deltas]]) + self.send_msg([literal("replay-range"), [start_revision, end_revision, + low_water_mark, send_deltas]]) self._recv_ack() for i in range(start_revision, end_revision+1): msg = self.recv_msg() @@ -694,7 +727,7 @@ class SVNClient(SVNConnection): cbs[1](i, dict(msg[1]), edit) self._unpack() - def do_switch(self, revision_to_update_to, update_target, recurse, + def do_switch(self, revision_to_update_to, update_target, recurse, switch_url, update_editor, depth=None): args = [] if revision_to_update_to is None or revision_to_update_to == -1: @@ -716,7 +749,7 @@ class SVNClient(SVNConnection): self.busy = False raise - def do_update(self, revision_to_update_to, update_target, recurse, + def do_update(self, revision_to_update_to, update_target, recurse, update_editor, depth=None): args = [] if revision_to_update_to is None or revision_to_update_to == -1: @@ -738,13 +771,15 @@ class SVNClient(SVNConnection): raise def do_diff(self, revision_to_update, diff_target, versus_url, diff_editor, - recurse=True, ignore_ancestry=False, text_deltas=False, depth=None): + recurse=True, ignore_ancestry=False, text_deltas=False, + depth=None): args = [] if revision_to_update is None or revision_to_update == -1: args.append([]) else: args.append([revision_to_update]) - args += [diff_target, recurse, ignore_ancestry, versus_url, text_deltas] + args += [diff_target, recurse, ignore_ancestry, versus_url, + text_deltas] if depth is not None: args.append(literal(depth)) self.busy = True @@ -782,9 +817,9 @@ class SVNClient(SVNConnection): return self._uuid @mark_busy - def log(self, paths, start, end, limit=0, - discover_changed_paths=True, strict_node_history=True, - include_merged_revisions=True, revprops=None): + def log(self, paths, start, end, limit=0, discover_changed_paths=True, + strict_node_history=True, include_merged_revisions=True, + revprops=None): args = [paths] if start is None or start == -1: args.append([]) @@ -825,7 +860,8 @@ class SVNClient(SVNConnection): if len(msg) > 6 and msg[6]: revno = None else: - revno = msg[1] + revno = msg[1] # noqa: F841 + # TODO(jelmer): Do something with revno revprops = {} if len(msg[2]) != 0: revprops[properties.PROP_REVISION_AUTHOR] = msg[2][0] @@ -845,7 +881,7 @@ class SVNClient(SVNConnection): callback(paths, rev, props) else: callback(paths, rev, props, has_children) - + MIN_VERSION = 2 MAX_VERSION = 2 @@ -861,9 +897,8 @@ class SVNServer(SVNConnection): self._logf = logf super(SVNServer, self).__init__(recv_fn, send_fn) - def send_greeting(self): self.send_success( - MIN_VERSION, MAX_VERSION, [literal(x) for x in MECHANISMS], + MIN_VERSION, MAX_VERSION, [literal(x) for x in MECHANISMS], [literal(x) for x in CAPABILITIES]) def send_mechs(self): @@ -876,8 +911,9 @@ class SVNServer(SVNConnection): self.send_success([], "") def send_unknown(self, cmd): - self.send_failure([ERR_RA_SVN_UNKNOWN_CMD, - "Unknown command '%s'" % cmd, __file__, 52]) + self.send_failure( + [ERR_RA_SVN_UNKNOWN_CMD, + "Unknown command '%s'" % cmd, __file__, 52]) def get_latest_rev(self): self.send_ack() @@ -890,13 +926,14 @@ class SVNServer(SVNConnection): revnum = rev[0] kind = self.repo_backend.check_path(path, revnum) self.send_ack() - self.send_success(literal({NODE_NONE: "none", - NODE_DIR: "dir", - NODE_FILE: "file", - NODE_UNKNOWN: "unknown"}[kind])) - - def log(self, target_path, start_rev, end_rev, changed_paths, - strict_node, limit=None, include_merged_revisions=False, + self.send_success(literal({ + NODE_NONE: "none", + NODE_DIR: "dir", + NODE_FILE: "file", + NODE_UNKNOWN: "unknown"}[kind])) + + def log(self, target_path, start_rev, end_rev, changed_paths, + strict_node, limit=None, include_merged_revisions=False, all_revprops=None, revprops=None): def send_revision(revno, author, date, message, changed_paths=None): changes = [] @@ -916,14 +953,15 @@ class SVNServer(SVNConnection): end_revnum = None else: end_revnum = end_rev[0] - self.repo_backend.log(send_revision, target_path, start_revnum, + self.repo_backend.log(send_revision, target_path, start_revnum, end_revnum, changed_paths, strict_node, limit) self.send_msg(literal("done")) self.send_success() def open_backend(self, url): (rooturl, location) = urlparse.splithost(url) - self.repo_backend, self.relpath = self.backend.open_repository(location) + self.repo_backend, self.relpath = self.backend.open_repository( + location) def reparent(self, parent): self.open_backend(parent) @@ -941,7 +979,7 @@ class SVNServer(SVNConnection): self.send_success([]) else: args = [dirent["name"], dirent["kind"], dirent["size"], - dirent["has-props"], dirent["created-rev"]] + dirent["has-props"], dirent["created-rev"]] if "created-date" in dirent: args.append([dirent["created-date"]]) else: @@ -953,8 +991,8 @@ class SVNServer(SVNConnection): self.send_success([args]) def commit(self, logmsg, locks, keep_locks=False, rev_props=None): - self.send_failure([ERR_UNSUPPORTED_FEATURE, - "commit not yet supported", __file__, 42]) + self.send_failure([ERR_UNSUPPORTED_FEATURE, + "commit not yet supported", __file__, 42]) def rev_proplist(self, revnum): self.send_ack() @@ -977,7 +1015,8 @@ class SVNServer(SVNConnection): self.send_msg(literal("done")) self.send_success() - def update(self, rev, target, recurse, depth=None, send_copyfrom_param=True): + def update(self, rev, target, recurse, depth=None, + send_copyfrom_param=True): self.send_ack() while True: msg = self.recv_msg() @@ -997,7 +1036,8 @@ class SVNServer(SVNConnection): if client_result[0] == "success": return else: - self.mutter("Client reported error during update: %r" % client_result) + self.mutter("Client reported error during update: %r" % + client_result) # Needs to be sent back to the client to display self.send_failure(client_result[1][0]) @@ -1053,7 +1093,7 @@ class SVNServer(SVNConnection): # Expect: while not self._stop: - ( cmd, args ) = self.recv_msg() + (cmd, args) = self.recv_msg() if cmd not in self.commands: self.mutter("client used unknown command %r" % cmd) self.send_unknown(cmd) @@ -1073,11 +1113,12 @@ class TCPSVNRequestHandler(StreamRequestHandler): def __init__(self, request, client_address, server): self._server = server - StreamRequestHandler.__init__(self, request, - client_address, server) + StreamRequestHandler.__init__( + self, request, client_address, server) def handle(self): - server = SVNServer(self._server._backend, self.rfile.read, + server = SVNServer( + self._server._backend, self.rfile.read, self.wfile.write, self._server._logf) try: server.serve() @@ -1096,4 +1137,3 @@ class TCPSVNServer(TCPServer): self._logf = logf self._backend = backend TCPServer.__init__(self, addr, TCPSVNRequestHandler) - diff --git a/subvertpy/server.py b/subvertpy/server.py index e6ab2f4f..02283eab 100644 --- a/subvertpy/server.py +++ b/subvertpy/server.py @@ -16,6 +16,7 @@ """Server backend base classes.""" + class ServerBackend(object): """A server backend.""" @@ -30,7 +31,7 @@ def generate_random_id(): class ServerRepositoryBackend(object): - + def get_uuid(self): raise NotImplementedError(self.get_uuid) @@ -60,6 +61,3 @@ class ServerRepositoryBackend(object): def get_locations(self, path, peg_revnum, revnums): raise NotImplementedError(self.get_locations) - - - diff --git a/subvertpy/tests/__init__.py b/subvertpy/tests/__init__.py index c668bc8e..7fb1e008 100644 --- a/subvertpy/tests/__init__.py +++ b/subvertpy/tests/__init__.py @@ -54,8 +54,9 @@ def rmtree_with_readonly(path): In Windows a read-only file cannot be removed, and shutil.rmtree fails. """ def force_rm_handle(remove_path, path, excinfo): - os.chmod(path, os.stat(path).st_mode | stat.S_IWUSR | stat.S_IWGRP | - stat.S_IWOTH) + os.chmod( + path, + os.stat(path).st_mode | stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) remove_path(path) shutil.rmtree(path, onerror=force_rm_handle) @@ -69,8 +70,9 @@ class TestCase(unittest.TestCase): def assertIsInstance(self, obj, kls, msg=None): """Fail if obj is not an instance of kls""" if not isinstance(obj, kls): - if msg is None: msg = "%r is an instance of %s rather than %s" % ( - obj, obj.__class__, kls) + if msg is None: + msg = "%r is an instance of %s rather than %s" % ( + obj, obj.__class__, kls) self.fail(msg) def assertIs(self, left, right, message=None): @@ -145,7 +147,8 @@ class TestDirEditor(object): def open_dir(self, path): self.close_children() - child = TestDirEditor(self.dir.open_directory(path, -1), self.baseurl, self.revnum) + child = TestDirEditor(self.dir.open_directory(path, -1), self.baseurl, + self.revnum) self.children.append(child) return child @@ -164,7 +167,7 @@ class TestDirEditor(object): assert (copyfrom_path is None and copyfrom_rev == -1) or \ (copyfrom_path is not None and copyfrom_rev > -1) child = TestDirEditor(self.dir.add_directory(path, copyfrom_path, - copyfrom_rev), self.baseurl, self.revnum) + copyfrom_rev), self.baseurl, self.revnum) self.children.append(child) return child @@ -175,7 +178,7 @@ class TestDirEditor(object): if copyfrom_path is not None and copyfrom_rev == -1: copyfrom_rev = self.revnum child = TestFileEditor(self.dir.add_file(path, copyfrom_path, - copyfrom_rev)) + copyfrom_rev)) self.children.append(child) return child @@ -208,7 +211,7 @@ class SubversionTestCase(TestCaseInTempDir): ra.get_ssl_client_cert_pw_file_provider(), ra.get_ssl_server_trust_file_provider()]) self.client_ctx.log_msg_func = self.log_message_func - #self.client_ctx.notify_func = lambda err: mutter("Error: %s" % err) + # self.client_ctx.notify_func = lambda err: mutter("Error: %s" % err) def setUp(self): super(SubversionTestCase, self).setUp() @@ -232,16 +235,16 @@ class SubversionTestCase(TestCaseInTempDir): if allow_revprop_changes: if sys.platform == 'win32': - revprop_hook = os.path.join(abspath, "hooks", - "pre-revprop-change.bat") + revprop_hook = os.path.join( + abspath, "hooks", "pre-revprop-change.bat") f = open(revprop_hook, 'w') try: f.write("exit 0\n") finally: f.close() else: - revprop_hook = os.path.join(abspath, "hooks", - "pre-revprop-change") + revprop_hook = os.path.join( + abspath, "hooks", "pre-revprop-change") f = open(revprop_hook, 'w') try: f.write("#!/bin/sh\n") @@ -254,7 +257,6 @@ class SubversionTestCase(TestCaseInTempDir): else: return "file://%s" % abspath - def make_checkout(self, repos_url, relpath): """Create a new checkout.""" self.client_ctx.checkout(repos_url, relpath, "HEAD") @@ -334,15 +336,17 @@ class SubversionTestCase(TestCaseInTempDir): r = ra.RemoteAccess(url) assert isinstance(url, str) ret = {} + def rcvr(orig_paths, rev, revprops, has_children=None): - ret[rev] = (orig_paths, + ret[rev] = ( + orig_paths, revprops.get(properties.PROP_REVISION_AUTHOR), revprops.get(properties.PROP_REVISION_DATE), revprops.get(properties.PROP_REVISION_LOG)) r.get_log(rcvr, [""], start_revnum, stop_revnum, 0, True, True, revprops=[properties.PROP_REVISION_AUTHOR, - properties.PROP_REVISION_DATE, - properties.PROP_REVISION_LOG]) + properties.PROP_REVISION_DATE, + properties.PROP_REVISION_LOG]) return ret def client_delete(self, relpath): @@ -402,8 +406,8 @@ class SubversionTestCase(TestCaseInTempDir): :param clientpath: Path to checkout :return: Repository URL. """ - repos_url = self.make_repository(repospath, - allow_revprop_changes=allow_revprop_changes) + repos_url = self.make_repository( + repospath, allow_revprop_changes=allow_revprop_changes) self.make_checkout(repos_url, clientpath) return repos_url @@ -421,11 +425,11 @@ class SubversionTestCase(TestCaseInTempDir): :param message: Commit message :return: Commit editor object """ - ra_ctx = RemoteAccess(url.encode("utf-8"), - auth=Auth([ra.get_username_provider()])) + ra_ctx = RemoteAccess( + url.encode("utf-8"), auth=Auth([ra.get_username_provider()])) revnum = ra_ctx.get_latest_revnum() - return TestCommitEditor(ra_ctx.get_commit_editor({"svn:log": message}), - ra_ctx.url, revnum) + return TestCommitEditor(ra_ctx.get_commit_editor( + {"svn:log": message}), ra_ctx.url, revnum) def test_suite(): diff --git a/subvertpy/tests/test_client.py b/subvertpy/tests/test_client.py index f7566422..a1a0534d 100644 --- a/subvertpy/tests/test_client.py +++ b/subvertpy/tests/test_client.py @@ -74,8 +74,9 @@ class TestClient(SubversionTestCase): def test_commit_start(self): self.build_tree({"dc/foo": None}) - self.client = client.Client(auth=ra.Auth([ra.get_username_provider()]), - log_msg_func=lambda c: "Bmessage") + self.client = client.Client( + auth=ra.Auth([ra.get_username_provider()]), + log_msg_func=lambda c: "Bmessage") self.client.add("dc/foo") self.client.commit(["dc"]) r = ra.RemoteAccess(self.repos_url) @@ -98,7 +99,8 @@ class TestClient(SubversionTestCase): self.build_tree({"dc/foo": b"bla"}) self.client.add("dc/foo") self.client.commit(["dc"]) - self.client.export(self.repos_url, "de", ignore_externals=True, ignore_keywords=True) + self.client.export(self.repos_url, "de", ignore_externals=True, + ignore_keywords=True) self.assertEqual(["foo"], os.listdir("de")) def test_add_recursive(self): @@ -135,8 +137,6 @@ class TestClient(SubversionTestCase): shutil.rmtree(base_dir) def test_diff(self): - r = ra.RemoteAccess(self.repos_url, - auth=ra.Auth([ra.get_username_provider()])) dc = self.get_commit_editor(self.repos_url) f = dc.add_file("foo") f.modify(b"foo1") @@ -148,9 +148,10 @@ class TestClient(SubversionTestCase): dc.close() if client.api_version() < (1, 5): - self.assertRaises(NotImplementedError, self.client.diff, 1, 2, + self.assertRaises( + NotImplementedError, self.client.diff, 1, 2, self.repos_url, self.repos_url) - return # Skip test + return # Skip test (outf, errf) = self.client.diff(1, 2, self.repos_url, self.repos_url) self.addCleanup(outf.close) @@ -203,6 +204,7 @@ class TestClient(SubversionTestCase): commit_msg_1 = b"Commit" commit_msg_2 = b"Commit 2" delta = timedelta(hours=1) + def cb(changed_paths, revision, revprops, has_children=False): log_entries.append({ 'changed_paths': changed_paths, @@ -241,7 +243,8 @@ class TestClient(SubversionTestCase): self.assertLogEntryMessageEquals(commit_msg_1, log_entries[1]) self.assertLogEntryDateAlmostEquals(commit_1_dt, log_entries[1], delta) log_entries = [] - self.client.log(cb, "dc/foo", start_rev=2, end_rev=2, discover_changed_paths=True) + self.client.log(cb, "dc/foo", start_rev=2, end_rev=2, + discover_changed_paths=True) self.assertEqual(1, len(log_entries)) self.assertLogEntryChangedPathsEquals(["/foo", "/bar"], log_entries[0]) self.assertEqual(2, log_entries[0]["revision"]) diff --git a/subvertpy/tests/test_core.py b/subvertpy/tests/test_core.py index fda776c6..f608aca3 100644 --- a/subvertpy/tests/test_core.py +++ b/subvertpy/tests/test_core.py @@ -1,5 +1,5 @@ # Copyright (C) 2005-2007 Jelmer Vernooij - + # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or @@ -18,10 +18,12 @@ import subvertpy from subvertpy.tests import TestCase + class TestCore(TestCase): def setUp(self): super(TestCore, self).setUp() def test_exc(self): - self.assertTrue(isinstance(subvertpy.SubversionException("foo", 1), Exception)) + self.assertTrue( + isinstance(subvertpy.SubversionException("foo", 1), Exception)) diff --git a/subvertpy/tests/test_delta.py b/subvertpy/tests/test_delta.py index 4dbec0dd..20853d1e 100644 --- a/subvertpy/tests/test_delta.py +++ b/subvertpy/tests/test_delta.py @@ -1,5 +1,5 @@ # Copyright (C) 2005-2007 Jelmer Vernooij - + # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or @@ -28,6 +28,7 @@ from subvertpy.delta import ( ) from subvertpy.tests import TestCase + class DeltaTests(TestCase): def setUp(self): @@ -40,8 +41,8 @@ class DeltaTests(TestCase): def test_send_stream(self): stream = BytesIO(b"foo") send_stream(stream, self.storing_window_handler) - self.assertEqual([(0, 0, 3, 0, [(2, 0, 3)], b'foo'), None], - self.windows) + self.assertEqual([(0, 0, 3, 0, [(2, 0, 3)], b'foo'), None], + self.windows) def test_apply_delta(self): stream = BytesIO() @@ -74,7 +75,8 @@ class MarshallTests(TestCase): def test_roundtrip_length(self): self.assertEqual((42, bytes()), decode_length(encode_length(42))) - def test_roundtrip_window(self): mywindow = (0, 0, 3, 1, [(2, 0, 3)], b'foo') - self.assertEqual([mywindow], list(unpack_svndiff0(pack_svndiff0([mywindow])))) + self.assertEqual( + [mywindow], + list(unpack_svndiff0(pack_svndiff0([mywindow])))) diff --git a/subvertpy/tests/test_marshall.py b/subvertpy/tests/test_marshall.py index 5ccaa980..751276f2 100644 --- a/subvertpy/tests/test_marshall.py +++ b/subvertpy/tests/test_marshall.py @@ -24,6 +24,7 @@ from subvertpy.marshall import ( ) from subvertpy.tests import TestCase + class TestMarshalling(TestCase): def test_literal_txt(self): @@ -41,15 +42,15 @@ class TestMarshalling(TestCase): def test_marshall_error(self): e = MarshallError("bla bla") self.assertEqual("bla bla", e.__str__()) - + def test_marshall_int(self): self.assertEqual(b"1 ", marshall(1)) def test_marshall_list(self): - self.assertEqual(b"( 1 2 3 4 ) ", marshall([1,2,3,4])) - + self.assertEqual(b"( 1 2 3 4 ) ", marshall([1, 2, 3, 4])) + def test_marshall_list_mixed(self): - self.assertEqual(b"( 1 3 4 3:str ) ", marshall([1,3,4,"str"])) + self.assertEqual(b"( 1 3 4 3:str ) ", marshall([1, 3, 4, "str"])) def test_marshall_literal(self): self.assertEqual(b"foo ", marshall(literal("foo"))) @@ -87,9 +88,8 @@ class TestMarshalling(TestCase): def test_unmarshall_toolong(self): self.assertRaises(MarshallError, unmarshall, b"43432432:bla") - def test_unmarshall_literal(self): + def test_unmarshall_literal_negative(self): self.assertRaises(MarshallError, unmarshall, b":-3213") def test_unmarshall_open_list(self): self.assertRaises(MarshallError, unmarshall, b"( 3 4 ") - diff --git a/subvertpy/tests/test_properties.py b/subvertpy/tests/test_properties.py index 5e3aea44..996f03a3 100644 --- a/subvertpy/tests/test_properties.py +++ b/subvertpy/tests/test_properties.py @@ -23,13 +23,16 @@ from subvertpy.tests import ( TestCase, ) + class TestProperties(TestCase): def setUp(self): super(TestProperties, self).setUp() def test_time_from_cstring(self): - self.assertEqual(1225704780716938, properties.time_from_cstring("2008-11-03T09:33:00.716938Z")) + self.assertEqual( + 1225704780716938, + properties.time_from_cstring("2008-11-03T09:33:00.716938Z")) def test_time_from_cstring_independent_from_dst(self): old_tz = os.environ.get('TZ', None) @@ -38,11 +41,14 @@ class TestProperties(TestCase): raise SkipTest("tzset not available on Windows") try: - # First specify a fixed timezone with known DST (late March to late October) + # First specify a fixed timezone with known DST (late March to late + # October) os.environ['TZ'] = 'Europe/London' time.tzset() # Now test a time within that DST - self.assertEqual(1275295762430000, properties.time_from_cstring("2010-05-31T08:49:22.430000Z")) + self.assertEqual( + 1275295762430000, + properties.time_from_cstring("2010-05-31T08:49:22.430000Z")) finally: if old_tz is None: del os.environ['TZ'] @@ -51,46 +57,56 @@ class TestProperties(TestCase): time.tzset() def test_time_to_cstring(self): - self.assertEqual("2008-11-03T09:33:00.716938Z", properties.time_to_cstring(1225704780716938)) + self.assertEqual( + "2008-11-03T09:33:00.716938Z", + properties.time_to_cstring(1225704780716938)) class TestExternalsParser(TestCase): def test_parse_root_relative_externals(self): - self.assertRaises(NotImplementedError, properties.parse_externals_description, - "http://example.com", "third-party/skins ^/foo") + self.assertRaises( + NotImplementedError, + properties.parse_externals_description, + "http://example.com", "third-party/skins ^/foo") def test_parse_scheme_relative_externals(self): - self.assertRaises(NotImplementedError, properties.parse_externals_description, - "http://example.com", "third-party/skins //foo") + self.assertRaises( + NotImplementedError, properties.parse_externals_description, + "http://example.com", "third-party/skins //foo") def test_parse_externals(self): self.assertEqual({ - 'third-party/sounds': (None, "http://sounds.red-bean.com/repos"), - 'third-party/skins': (None, "http://skins.red-bean.com/repositories/skinproj"), - 'third-party/skins/toolkit': (21, "http://svn.red-bean.com/repos/skin-maker")}, - properties.parse_externals_description("http://example.com", -"""third-party/sounds http://sounds.red-bean.com/repos -third-party/skins http://skins.red-bean.com/repositories/skinproj + 'third-party/sounds': + (None, "http://sounds.red-bean.com/repos"), + 'third-party/skins': + (None, "http://skins.red-bean.com/repositories/skinproj"), + 'third-party/skins/toolkit': + (21, "http://svn.red-bean.com/repos/skin-maker")}, + properties.parse_externals_description( + "http://example.com", """\ +third-party/sounds http://sounds.red-bean.com/repos +third-party/skins http://skins.red-bean.com/repositories/skinproj third-party/skins/toolkit -r21 http://svn.red-bean.com/repos/skin-maker""")) def test_parse_externals_space_revno(self): self.assertEqual({ - 'third-party/skins/toolkit': (21, "http://svn.red-bean.com/repos/skin-maker")}, - properties.parse_externals_description("http://example.com", -"""third-party/skins/toolkit -r 21 http://svn.red-bean.com/repos/skin-maker""")) + 'third-party/skins/toolkit': + (21, "http://svn.red-bean.com/repos/skin-maker")}, + properties.parse_externals_description("http://example.com", """\ +third-party/skins/toolkit -r 21 http://svn.red-bean.com/repos/skin-maker""")) def test_parse_externals_swapped(self): - self.assertEqual({'third-party/sounds': (None, "http://sounds.red-bean.com/repos")}, - properties.parse_externals_description("http://example.com", -"""http://sounds.red-bean.com/repos third-party/sounds + self.assertEqual( + {'third-party/sounds': (None, "http://sounds.red-bean.com/repos")}, + properties.parse_externals_description("http://example.com", """\ +http://sounds.red-bean.com/repos third-party/sounds """)) def test_parse_comment(self): self.assertEqual({ 'third-party/sounds': (None, "http://sounds.red-bean.com/repos") }, - properties.parse_externals_description("http://example.com/", -""" + properties.parse_externals_description("http://example.com/", """\ third-party/sounds http://sounds.red-bean.com/repos #third-party/skins http://skins.red-bean.com/repositories/skinproj @@ -100,36 +116,49 @@ third-party/sounds http://sounds.red-bean.com/repos self.assertEqual({ 'third-party/sounds': (None, "http://example.com/branches/other"), }, - properties.parse_externals_description("http://example.com/trunk", -"third-party/sounds ../branches/other")) + properties.parse_externals_description( + "http://example.com/trunk", + "third-party/sounds ../branches/other")) def test_parse_repos_root_relative(self): self.assertEqual({ - 'third-party/sounds': (None, "http://example.com/bar/bla/branches/other"), + 'third-party/sounds': + (None, "http://example.com/bar/bla/branches/other"), }, - properties.parse_externals_description("http://example.com/trunk", -"third-party/sounds /bar/bla/branches/other")) + properties.parse_externals_description( + "http://example.com/trunk", + "third-party/sounds /bar/bla/branches/other")) def test_parse_invalid_missing_url(self): """No URL specified.""" - self.assertRaises(properties.InvalidExternalsDescription, - lambda: properties.parse_externals_description("http://example.com/", "bla")) + self.assertRaises( + properties.InvalidExternalsDescription, + lambda: properties.parse_externals_description( + "http://example.com/", "bla")) def test_parse_invalid_too_much_data(self): """No URL specified.""" - self.assertRaises(properties.InvalidExternalsDescription, - lambda: properties.parse_externals_description(None, "bla -R40 http://bla/")) + self.assertRaises( + properties.InvalidExternalsDescription, + lambda: properties.parse_externals_description( + None, "bla -R40 http://bla/")) class MergeInfoPropertyParserTests(TestCase): def test_simple_range(self): - self.assertEqual({"/trunk": [(1, 2, True)]}, properties.parse_mergeinfo_property("/trunk:1-2\n")) + self.assertEqual( + {"/trunk": [(1, 2, True)]}, + properties.parse_mergeinfo_property("/trunk:1-2\n")) def test_simple_range_uninheritable(self): - self.assertEqual({"/trunk": [(1, 2, False)]}, properties.parse_mergeinfo_property("/trunk:1-2*\n")) + self.assertEqual( + {"/trunk": [(1, 2, False)]}, + properties.parse_mergeinfo_property("/trunk:1-2*\n")) def test_simple_individual(self): - self.assertEqual({"/trunk": [(1, 1, True)]}, properties.parse_mergeinfo_property("/trunk:1\n")) + self.assertEqual( + {"/trunk": [(1, 1, True)]}, + properties.parse_mergeinfo_property("/trunk:1\n")) def test_empty(self): self.assertEqual({}, properties.parse_mergeinfo_property("")) @@ -137,10 +166,15 @@ class MergeInfoPropertyParserTests(TestCase): class MergeInfoPropertyCreatorTests(TestCase): def test_simple_range(self): - self.assertEqual("/trunk:1-2\n", properties.generate_mergeinfo_property({"/trunk": [(1, 2, True)]})) + self.assertEqual( + "/trunk:1-2\n", + properties.generate_mergeinfo_property({"/trunk": [(1, 2, True)]})) def test_simple_individual(self): - self.assertEqual("/trunk:1\n", properties.generate_mergeinfo_property({"/trunk": [(1, 1, True)]})) + self.assertEqual( + "/trunk:1\n", + properties.generate_mergeinfo_property( + {"/trunk": [(1, 1, True)]})) def test_empty(self): self.assertEqual("", properties.generate_mergeinfo_property({})) @@ -151,39 +185,62 @@ class RevnumRangeTests(TestCase): self.assertEqual([(1, 1, True)], properties.range_add_revnum([], 1)) def test_add_revnum_before(self): - self.assertEqual([(2, 2, True), (8, 8, True)], properties.range_add_revnum([(2, 2, True)], 8)) + self.assertEqual( + [(2, 2, True), (8, 8, True)], + properties.range_add_revnum([(2, 2, True)], 8)) def test_add_revnum_included(self): - self.assertEqual([(1, 3, True)], properties.range_add_revnum([(1, 3, True)], 2)) + self.assertEqual( + [(1, 3, True)], + properties.range_add_revnum([(1, 3, True)], 2)) def test_add_revnum_after(self): - self.assertEqual([(1, 3, True), (5, 5, True)], properties.range_add_revnum([(1, 3, True)], 5)) + self.assertEqual( + [(1, 3, True), (5, 5, True)], + properties.range_add_revnum([(1, 3, True)], 5)) def test_add_revnum_extend_before(self): - self.assertEqual([(1, 3, True)], properties.range_add_revnum([(2, 3, True)], 1)) + self.assertEqual( + [(1, 3, True)], + properties.range_add_revnum([(2, 3, True)], 1)) def test_add_revnum_extend_after(self): - self.assertEqual([(1, 3, True)], properties.range_add_revnum([(1, 2, True)], 3)) + self.assertEqual( + [(1, 3, True)], + properties.range_add_revnum([(1, 2, True)], 3)) def test_revnum_includes_empty(self): self.assertFalse(properties.range_includes_revnum([], 2)) def test_revnum_includes_oor(self): - self.assertFalse(properties.range_includes_revnum([(1, 3, True), (4, 5, True)], 10)) + self.assertFalse( + properties.range_includes_revnum( + [(1, 3, True), (4, 5, True)], 10)) def test_revnum_includes_in(self): - self.assertTrue(properties.range_includes_revnum([(1, 3, True), (4, 5, True)], 2)) + self.assertTrue( + properties.range_includes_revnum( + [(1, 3, True), (4, 5, True)], 2)) class MergeInfoIncludeTests(TestCase): + def test_includes_individual(self): - self.assertTrue(properties.mergeinfo_includes_revision({"/trunk": [(1, 1, True)]}, "/trunk", 1)) + self.assertTrue( + properties.mergeinfo_includes_revision( + {"/trunk": [(1, 1, True)]}, "/trunk", 1)) def test_includes_range(self): - self.assertTrue(properties.mergeinfo_includes_revision({"/trunk": [(1, 5, True)]}, "/trunk", 3)) + self.assertTrue( + properties.mergeinfo_includes_revision( + {"/trunk": [(1, 5, True)]}, "/trunk", 3)) def test_includes_invalid_path(self): - self.assertFalse(properties.mergeinfo_includes_revision({"/somepath": [(1, 5, True)]}, "/trunk", 3)) + self.assertFalse( + properties.mergeinfo_includes_revision( + {"/somepath": [(1, 5, True)]}, "/trunk", 3)) def test_includes_invalid_revnum(self): - self.assertFalse(properties.mergeinfo_includes_revision({"/trunk": [(1, 5, True)]}, "/trunk", 30)) + self.assertFalse( + properties.mergeinfo_includes_revision( + {"/trunk": [(1, 5, True)]}, "/trunk", 30)) diff --git a/subvertpy/tests/test_ra.py b/subvertpy/tests/test_ra.py index 5cbefcbb..51ad42b3 100644 --- a/subvertpy/tests/test_ra.py +++ b/subvertpy/tests/test_ra.py @@ -1,5 +1,5 @@ # Copyright (C) 2005-2007 Jelmer Vernooij - + # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or @@ -27,6 +27,7 @@ from subvertpy.tests import ( TestCase, ) + class VersionTest(TestCase): def test_version_length(self): @@ -50,8 +51,8 @@ class TestRemoteAccess(SubversionTestCase): def setUp(self): super(TestRemoteAccess, self).setUp() self.repos_url = self.make_repository("d") - self.ra = ra.RemoteAccess(self.repos_url, - auth=ra.Auth([ra.get_username_provider()])) + self.ra = ra.RemoteAccess( + self.repos_url, auth=ra.Auth([ra.get_username_provider()])) def tearDown(self): del self.ra @@ -67,7 +68,7 @@ class TestRemoteAccess(SubversionTestCase): def test_repr(self): self.assertEqual("RemoteAccess(\"%s\")" % self.repos_url, - repr(self.ra)) + repr(self.ra)) def test_latest_revnum(self): self.assertEqual(0, self.ra.get_latest_revnum()) @@ -93,9 +94,11 @@ class TestRemoteAccess(SubversionTestCase): def test_has_capability(self): if ra.api_version() < (1, 5): - self.assertRaises(NotImplementedError, self.ra.has_capability, "FOO") + self.assertRaises(NotImplementedError, self.ra.has_capability, + "FOO") else: - self.assertRaises(SubversionException, self.ra.has_capability, "FOO") + self.assertRaises(SubversionException, self.ra.has_capability, + "FOO") def test_get_dir(self): ret = self.ra.get_dir("", 0) @@ -107,7 +110,8 @@ class TestRemoteAccess(SubversionTestCase): def test_get_dir_kind(self): self.do_commit() - (dirents, fetch_rev, props) = self.ra.get_dir("/", 1, fields=ra.DIRENT_KIND) + (dirents, fetch_rev, props) = self.ra.get_dir( + "/", 1, fields=ra.DIRENT_KIND) self.assertIsInstance(props, dict) self.assertEqual(1, fetch_rev) self.assertEqual(NODE_DIR, dirents["foo"]["kind"]) @@ -123,19 +127,28 @@ class TestRemoteAccess(SubversionTestCase): self.do_commit() class MyFileEditor: - def change_prop(self, name, val): pass + + def change_prop(self, name, val): pass + def close(self, checksum=None): pass class MyDirEditor: - def change_prop(self, name, val): pass + + def change_prop(self, name, val): pass + def add_directory(self, *args): return MyDirEditor() + def add_file(self, *args): return MyFileEditor() + def close(self): pass class MyEditor: - def set_target_revision(self, rev): pass + + def set_target_revision(self, rev): pass + def open_root(self, base_rev): return MyDirEditor() + def close(self): pass reporter = self.ra.do_diff(1, "", self.ra.get_repos_root(), MyEditor()) reporter.set_path("", 0, True) @@ -145,14 +158,15 @@ class TestRemoteAccess(SubversionTestCase): def test_iter_log_invalid(self): self.assertRaises(SubversionException, list, self.ra.iter_log( - ["idontexist"], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])) + ["idontexist"], 0, 0, revprops=[ + "svn:date", "svn:author", "svn:log"])) self.assertRaises(SubversionException, list, self.ra.iter_log( [""], 0, 1000, revprops=["svn:date", "svn:author", "svn:log"])) def test_iter_log(self): def check_results(returned): self.assertEqual(2, len(returned)) - self.assertTrue(len(returned[0]) in (3,4)) + self.assertTrue(len(returned[0]) in (3, 4)) if len(returned[0]) == 3: (paths, revnum, props) = returned[0] else: @@ -165,27 +179,33 @@ class TestRemoteAccess(SubversionTestCase): else: (paths, revnum, props, has_children) = returned[1] if ra.api_version() < (1, 6): - self.assertEqual({'/foo': ('A', None, -1, NODE_UNKNOWN)}, paths) + self.assertEqual({'/foo': ('A', None, -1, NODE_UNKNOWN)}, + paths) else: self.assertEqual({'/foo': ('A', None, -1, NODE_DIR)}, paths) self.assertEqual(revnum, 1) - self.assertEqual(set(["svn:date", "svn:author", "svn:log"]), - set(props.keys())) - returned = list(self.ra.iter_log([""], 0, 0, - revprops=["svn:date", "svn:author", "svn:log"])) + self.assertEqual(set(["svn:date", "svn:author", "svn:log"]), + set(props.keys())) + + returned = list(self.ra.iter_log( + [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])) self.assertEqual(1, len(returned)) self.do_commit() - returned = list(self.ra.iter_log(None, 0, 1, discover_changed_paths=True, - strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"])) + returned = list(self.ra.iter_log( + None, 0, 1, discover_changed_paths=True, + strict_node_history=False, + revprops=["svn:date", "svn:author", "svn:log"])) check_results(returned) def test_get_log(self): returned = [] + def cb(*args): returned.append(args) + def check_results(returned): self.assertEqual(2, len(returned)) - self.assertTrue(len(returned[0]) in (3,4)) + self.assertTrue(len(returned[0]) in (3, 4)) if len(returned[0]) == 3: (paths, revnum, props) = returned[0] else: @@ -199,22 +219,28 @@ class TestRemoteAccess(SubversionTestCase): (paths, revnum, props, has_children) = returned[1] self.assertEqual({'/foo': ('A', None, -1)}, paths) self.assertEqual(revnum, 1) - self.assertEqual(set(["svn:date", "svn:author", "svn:log"]), + self.assertEqual(set(["svn:date", "svn:author", "svn:log"]), set(props.keys())) - self.ra.get_log(cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"]) + self.ra.get_log(cb, [""], 0, 0, + revprops=["svn:date", "svn:author", "svn:log"]) self.assertEqual(1, len(returned)) self.do_commit() returned = [] - self.ra.get_log(cb, None, 0, 1, discover_changed_paths=True, - strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"]) + self.ra.get_log(cb, None, 0, 1, discover_changed_paths=True, + strict_node_history=False, + revprops=["svn:date", "svn:author", "svn:log"]) check_results(returned) def test_get_log_cancel(self): + def cb(*args): raise KeyError + self.do_commit() - self.assertRaises(KeyError, - self.ra.get_log, cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"]) + self.assertRaises( + KeyError, + self.ra.get_log, cb, [""], 0, 0, + revprops=["svn:date", "svn:author", "svn:log"]) def test_get_commit_editor_double_close(self): def mycb(*args): @@ -231,7 +257,8 @@ class TestRemoteAccess(SubversionTestCase): def mycb(rev): pass editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb) - self.assertRaises(ra.BusyException, self.ra.get_commit_editor, + self.assertRaises( + ra.BusyException, self.ra.get_commit_editor, {"svn:log": "foo"}, mycb) editor.abort() @@ -244,11 +271,15 @@ class TestRemoteAccess(SubversionTestCase): self.assertRaises(RuntimeError, root.add_directory, "foo") def test_get_commit_editor_custom_revprops(self): - if ra.version()[:2] < (1,5): + if ra.version()[:2] < (1, 5): return + def mycb(paths, rev, revprops): pass - editor = self.ra.get_commit_editor({"svn:log": "foo", "bar:foo": "bla", "svn:custom:blie": "bloe"}, mycb) + + editor = self.ra.get_commit_editor( + {"svn:log": "foo", "bar:foo": "bla", + "svn:custom:blie": "bloe"}, mycb) root = editor.open_root() root.add_directory("somedir").close() root.close() @@ -256,7 +287,8 @@ class TestRemoteAccess(SubversionTestCase): revprops = self.ra.rev_proplist(1) self.assertEqual( - set(['bar:foo', 'svn:author', 'svn:custom:blie', 'svn:date', 'svn:log']), + set(['bar:foo', 'svn:author', 'svn:custom:blie', 'svn:date', + 'svn:log']), set(revprops.keys()), "result: %r" % revprops) def test_get_commit_editor_context_manager(self): @@ -357,7 +389,10 @@ class TestRemoteAccess(SubversionTestCase): cb.close() ret = self.ra.stat("bar", 1) - self.assertEqual(set(['last_author', 'kind', 'created_rev', 'has_props', 'time', 'size']), set(ret.keys())) + self.assertEqual( + set(['last_author', 'kind', 'created_rev', 'has_props', 'time', + 'size']), + set(ret.keys())) def test_get_locations_dir(self): cb = self.commit_editor() @@ -372,17 +407,21 @@ class TestRemoteAccess(SubversionTestCase): cb.delete("bar") cb.close() - self.assertEqual({1: "/bar", 2: "/bla"}, - self.ra.get_locations("bla", 2, [1,2])) + self.assertEqual( + {1: "/bar", 2: "/bla"}, + self.ra.get_locations("bla", 2, [1, 2])) - self.assertEqual({1: "/bar", 2: "/bar"}, - self.ra.get_locations("bar", 1, [1,2])) + self.assertEqual( + {1: "/bar", 2: "/bar"}, + self.ra.get_locations("bar", 1, [1, 2])) - self.assertEqual({1: "/bar", 2: "/bar"}, - self.ra.get_locations("bar", 2, [1,2])) + self.assertEqual( + {1: "/bar", 2: "/bar"}, + self.ra.get_locations("bar", 2, [1, 2])) - self.assertEqual({1: "/bar", 2: "/bla", 3: "/bla"}, - self.ra.get_locations("bla", 3, [1,2,3])) + self.assertEqual( + {1: "/bar", 2: "/bla", 3: "/bla"}, + self.ra.get_locations("bla", 3, [1, 2, 3])) class AuthTests(TestCase): @@ -392,47 +431,55 @@ class AuthTests(TestCase): def test_not_registered(self): auth = ra.Auth([]) - self.assertRaises(SubversionException, auth.credentials, "svn.simple", "MyRealm") + self.assertRaises( + SubversionException, auth.credentials, "svn.simple", "MyRealm") def test_simple(self): - auth = ra.Auth([ra.get_simple_prompt_provider(lambda realm, uname, may_save: ("foo", "geheim", False), 0)]) + auth = ra.Auth([ra.get_simple_prompt_provider( + lambda realm, uname, may_save: ("foo", "geheim", False), 0)]) creds = auth.credentials("svn.simple", "MyRealm") self.assertEqual(("foo", "geheim", 0), next(creds)) self.assertRaises(StopIteration, next, creds) def test_username(self): - auth = ra.Auth([ra.get_username_prompt_provider(lambda realm, may_save: ("somebody", False), 0)]) + auth = ra.Auth([ra.get_username_prompt_provider( + lambda realm, may_save: ("somebody", False), 0)]) creds = auth.credentials("svn.username", "MyRealm") self.assertEqual(("somebody", 0), next(creds)) self.assertRaises(StopIteration, next, creds) def test_client_cert(self): - auth = ra.Auth([ra.get_ssl_client_cert_prompt_provider(lambda realm, may_save: ("filename", False), 0)]) + auth = ra.Auth([ra.get_ssl_client_cert_prompt_provider( + lambda realm, may_save: ("filename", False), 0)]) creds = auth.credentials("svn.ssl.client-cert", "MyRealm") self.assertEqual(("filename", False), next(creds)) self.assertRaises(StopIteration, next, creds) def test_client_cert_pw(self): - auth = ra.Auth([ra.get_ssl_client_cert_pw_prompt_provider(lambda realm, may_save: ("supergeheim", False), 0)]) + auth = ra.Auth([ra.get_ssl_client_cert_pw_prompt_provider( + lambda realm, may_save: ("supergeheim", False), 0)]) creds = auth.credentials("svn.ssl.client-passphrase", "MyRealm") self.assertEqual(("supergeheim", False), next(creds)) self.assertRaises(StopIteration, next, creds) def test_server_trust(self): - auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: (42, False))]) + auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider( + lambda realm, failures, certinfo, may_save: (42, False))]) auth.set_parameter("svn:auth:ssl:failures", 23) creds = auth.credentials("svn.ssl.server", "MyRealm") self.assertEqual((42, 0), next(creds)) self.assertRaises(StopIteration, next, creds) def test_server_untrust(self): - auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: None)]) + auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider( + lambda realm, failures, certinfo, may_save: None)]) auth.set_parameter("svn:auth:ssl:failures", 23) creds = auth.credentials("svn.ssl.server", "MyRealm") self.assertRaises(StopIteration, next, creds) def test_retry(self): self.i = 0 + def inc_foo(realm, may_save): self.i += 1 return ("somebody%d" % self.i, False) diff --git a/subvertpy/tests/test_repos.py b/subvertpy/tests/test_repos.py index 21a0f4f1..ed5f2067 100644 --- a/subvertpy/tests/test_repos.py +++ b/subvertpy/tests/test_repos.py @@ -1,5 +1,5 @@ # Copyright (C) 2005-2007 Jelmer Vernooij - + # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or @@ -46,7 +46,8 @@ class TestRepository(TestCaseInTempDir): def test_capability(self): r = repos.create(os.path.join(self.test_dir, "foo")) if repos.api_version() < (1, 5): - self.assertRaises(NotImplementedError, r.has_capability, "mergeinfo") + self.assertRaises(NotImplementedError, r.has_capability, + "mergeinfo") else: self.assertIsInstance(r.has_capability("mergeinfo"), bool) @@ -70,13 +71,15 @@ class TestRepository(TestCaseInTempDir): def test_rev_root(self): repos.create(os.path.join(self.test_dir, "foo")) - self.assertTrue(repos.Repository("foo").fs().revision_root(0) is not None) + self.assertTrue( + repos.Repository("foo").fs().revision_root(0) is not None) def test_load_fs_invalid(self): r = repos.create(os.path.join(self.test_dir, "foo")) dumpfile = b"Malformed" feedback = BytesIO() - self.assertRaises(SubversionException, r.load_fs, BytesIO(dumpfile), + self.assertRaises( + SubversionException, r.load_fs, BytesIO(dumpfile), feedback, repos.LOAD_UUID_DEFAULT) def test_load_fs(self): @@ -98,15 +101,19 @@ class TestRepository(TestCaseInTempDir): """).encode("ascii") feedback = BytesIO() r.load_fs(BytesIO(dumpfile), feedback, repos.LOAD_UUID_DEFAULT) - self.assertEqual(r.fs().get_uuid(), "38f0a982-fd1f-4e00-aa6b-a20720f4b9ca") + self.assertEqual(r.fs().get_uuid(), + "38f0a982-fd1f-4e00-aa6b-a20720f4b9ca") def test_rev_props(self): repos.create(os.path.join(self.test_dir, "foo")) - self.assertEqual(["svn:date"], list(repos.Repository("foo").fs().revision_proplist(0).keys())) + self.assertEqual( + ["svn:date"], + list(repos.Repository("foo").fs().revision_proplist(0).keys())) def test_rev_root_invalid(self): repos.create(os.path.join(self.test_dir, "foo")) - self.assertRaises(SubversionException, repos.Repository("foo").fs().revision_root, 1) + self.assertRaises(SubversionException, + repos.Repository("foo").fs().revision_root, 1) def test_pack_fs(self): r = repos.create(os.path.join(self.test_dir, "foo")) @@ -121,15 +128,17 @@ class TestRepository(TestCaseInTempDir): repos.create(os.path.join(self.test_dir, "foo")) root = repos.Repository("foo").fs().revision_root(0) self.assertEqual(True, root.is_dir("")) - # TODO(jelmer): Newer versions of libsvn_repos crash when passed a nonexistant path. - #self.assertEqual(False, root.is_dir("nonexistant")) + # TODO(jelmer): Newer versions of libsvn_repos crash when passed a + # nonexistant path. + # self.assertEqual(False, root.is_dir("nonexistant")) def test_is_file(self): repos.create(os.path.join(self.test_dir, "foo")) root = repos.Repository("foo").fs().revision_root(0) self.assertEqual(False, root.is_file("")) - # TODO(jelmer): Newer versions of libsvn_repos crash when passed a nonexistant path. - #self.assertEqual(False, root.is_file("nonexistant")) + # TODO(jelmer): Newer versions of libsvn_repos crash when passed a + # nonexistant path. + # self.assertEqual(False, root.is_file("nonexistant")) class StreamTests(TestCase): diff --git a/subvertpy/tests/test_server.py b/subvertpy/tests/test_server.py index a0430320..80062b67 100644 --- a/subvertpy/tests/test_server.py +++ b/subvertpy/tests/test_server.py @@ -1,5 +1,5 @@ # Copyright (C) 2005-2007 Jelmer Vernooij - + # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or @@ -14,7 +14,3 @@ # along with this program. If not, see . """Subversion server tests.""" - -from subvertpy.ra_svn import SVNServer -from subvertpy.tests import SubversionTestCase - diff --git a/subvertpy/tests/test_wc.py b/subvertpy/tests/test_wc.py index 08dd9d51..940cfc6e 100644 --- a/subvertpy/tests/test_wc.py +++ b/subvertpy/tests/test_wc.py @@ -1,5 +1,5 @@ # Copyright (C) 2005-2007 Jelmer Vernooij - + # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or @@ -29,6 +29,7 @@ from subvertpy.tests import ( TestCase, ) + class VersionTest(TestCase): def test_version_length(self): @@ -65,9 +66,10 @@ class WorkingCopyTests(TestCase): def test_match_ignore_list(self): if wc.api_version() < (1, 5): - self.assertRaises(NotImplementedError, wc.match_ignore_list, "foo", []) - return # Skip test - self.assertTrue(wc.match_ignore_list("foo", [ "f*"])) + self.assertRaises( + NotImplementedError, wc.match_ignore_list, "foo", []) + return # Skip test + self.assertTrue(wc.match_ignore_list("foo", ["f*"])) self.assertTrue(wc.match_ignore_list("foo", ["foo"])) self.assertFalse(wc.match_ignore_list("foo", [])) self.assertFalse(wc.match_ignore_list("foo", ["bar"])) @@ -76,12 +78,12 @@ class WorkingCopyTests(TestCase): class WcTests(SubversionTestCase): def test_revision_status(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") ret = wc.revision_status("checkout") self.assertEqual((0, 0, 0, 0), ret) def test_revision_status_trailing(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") ret = wc.revision_status("checkout/") self.assertEqual((0, 0, 0, 0), ret) @@ -95,7 +97,7 @@ class AdmTests(SubversionTestCase): "Subversion 1.7 API for WorkingCopy not yet supported") def test_has_binary_prop(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"\x00 \x01"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout") @@ -108,24 +110,27 @@ class AdmTests(SubversionTestCase): self.build_tree({"checkout/bar": b"\x00 \x01"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout") - path = os.path.join(self.test_dir, "checkout/bar") - self.assertEqual(("%s/bar" % repos_url, 0), adm.get_ancestry("checkout/bar")) + self.assertEqual(("%s/bar" % repos_url, 0), + adm.get_ancestry("checkout/bar")) adm.close() def test_maybe_set_repos_root(self): repos_url = self.make_client("repos", "checkout") adm = wc.WorkingCopy(None, "checkout") - adm.maybe_set_repos_root(os.path.join(self.test_dir, "checkout"), repos_url) + adm.maybe_set_repos_root( + os.path.join(self.test_dir, "checkout"), repos_url) adm.close() def test_add_repos_file(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") adm = wc.WorkingCopy(None, "checkout", True) - adm.add_repos_file("checkout/bar", BytesIO(b"basecontents"), BytesIO(b"contents"), {}, {}) - self.assertEqual(b"basecontents", wc.get_pristine_contents("checkout/bar").read()) + adm.add_repos_file("checkout/bar", BytesIO(b"basecontents"), + BytesIO(b"contents"), {}, {}) + self.assertEqual(b"basecontents", + wc.get_pristine_contents("checkout/bar").read()) def test_mark_missing_deleted(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"\x00 \x01"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout", True) @@ -134,7 +139,7 @@ class AdmTests(SubversionTestCase): self.assertFalse(os.path.exists("checkout/bar")) def test_remove_from_revision_control(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"\x00 \x01"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout", True) @@ -142,12 +147,12 @@ class AdmTests(SubversionTestCase): self.assertTrue(os.path.exists("checkout/bar")) def test_relocate(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") adm = wc.WorkingCopy(None, "checkout", True) adm.relocate("checkout", "file://", "http://") def test_translated_stream(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"My id: $Id$"}) self.client_add('checkout/bar') self.client_set_prop("checkout/bar", "svn:keywords", "Id\n") @@ -158,7 +163,7 @@ class AdmTests(SubversionTestCase): self.assertTrue(stream.read().startswith(b"My id: $Id: ")) def test_text_modified(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"My id: $Id$"}) self.client_add('checkout/bar') self.client_set_prop("checkout/bar", "svn:keywords", "Id\n") @@ -169,7 +174,7 @@ class AdmTests(SubversionTestCase): self.assertTrue(adm.text_modified("checkout/bar", True)) def test_props_modified(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"My id: $Id$"}) self.client_add('checkout/bar') self.client_set_prop("checkout/bar", "svn:keywords", "Id\n") @@ -180,7 +185,7 @@ class AdmTests(SubversionTestCase): self.assertTrue(adm.props_modified("checkout/bar")) def test_prop_set(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"file"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout", True) @@ -193,17 +198,18 @@ class AdmTests(SubversionTestCase): if getattr(wc, "CommittedQueue", None) is None: raise SkipTest("CommittedQueue not available") cq = wc.CommittedQueue() - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") adm = wc.WorkingCopy(None, "checkout", True) - adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer") + adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", + "jelmer") def test_entry_not_found(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") adm = wc.WorkingCopy(None, "checkout") self.assertRaises(KeyError, adm.entry, "bar") def test_entry(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"\x00 \x01"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout") @@ -219,12 +225,12 @@ class AdmTests(SubversionTestCase): self.assertEqual(1, entry.revision) def test_get_actual_target(self): - repos_url = self.make_client("repos", ".") + self.make_client("repos", ".") self.assertEqual((self.test_dir, "bla"), - wc.get_actual_target("%s/bla" % self.test_dir)) + wc.get_actual_target("%s/bla" % self.test_dir)) def test_is_wc_root(self): - repos_url = self.make_client("repos", ".") + self.make_client("repos", ".") self.build_tree({"bar": None}) self.client_add('bar') adm = wc.WorkingCopy(None, ".") @@ -232,7 +238,7 @@ class AdmTests(SubversionTestCase): self.assertFalse(adm.is_wc_root(os.path.join(self.test_dir, "bar"))) def test_status(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"text"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout") @@ -242,10 +248,11 @@ class AdmTests(SubversionTestCase): self.assertEqual(wc.STATUS_NORMAL, adm.status('bar').status) def test_transmit_text_deltas(self): - repos_url = self.make_client("repos", ".") + self.make_client("repos", ".") self.build_tree({"bar": b"blala"}) self.client_add('bar') adm = wc.WorkingCopy(None, ".", True) + class Editor(object): """Editor""" @@ -261,7 +268,8 @@ class AdmTests(SubversionTestCase): pass editor = Editor() (tmpfile, digest) = adm.transmit_text_deltas("bar", True, editor) - self.assertEqual(editor._windows, [(0, 0, 5, 0, [(2, 0, 5)], b'blala'), None]) + self.assertEqual(editor._windows, + [(0, 0, 5, 0, [(2, 0, 5)], b'blala'), None]) self.assertIsInstance(tmpfile, str) self.assertEqual(16, len(digest)) @@ -271,7 +279,8 @@ class AdmTests(SubversionTestCase): cq = wc.CommittedQueue() cq.queue("bar", adm) - adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer") + adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", + "jelmer") bar = adm.entry("bar") self.assertEqual("bar", bar.name) self.assertEqual(NODE_FILE, bar.kind) @@ -281,20 +290,21 @@ class AdmTests(SubversionTestCase): self.assertEqual(1, bar.revision) def test_process_committed_queue(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"la"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout", True) cq = wc.CommittedQueue() cq.queue(os.path.join(self.test_dir, "checkout/bar"), adm) - adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer") + adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", + "jelmer") bar = adm.entry("checkout/bar") self.assertEqual("bar", bar.name) self.assertEqual(NODE_FILE, bar.kind) self.assertEqual(wc.SCHEDULE_ADD, bar.schedule) def test_probe_try(self): - repos_url = self.make_client("repos", "checkout") + self.make_client("repos", "checkout") self.build_tree({"checkout/bar": b"la"}) self.client_add('checkout/bar') adm = wc.WorkingCopy(None, "checkout", True) @@ -304,4 +314,6 @@ class AdmTests(SubversionTestCase): (msg, num) = e.args if num != subvertpy.ERR_WC_NOT_WORKING_COPY: raise - self.assertEqual("checkout", adm.probe_try(os.path.join("checkout", "bar")).access_path()) + self.assertEqual( + "checkout", + adm.probe_try(os.path.join("checkout", "bar")).access_path()) -- cgit v1.2.3