summaryrefslogtreecommitdiff
path: root/subvertpy
diff options
context:
space:
mode:
authorJelmer Vernooij <jelmer@jelmer.uk>2017-07-16 21:31:45 +0000
committerJelmer Vernooij <jelmer@jelmer.uk>2017-07-16 21:31:45 +0000
commitc1b317d2a65ce63720cf3edd488e868532918add (patch)
treed81c4093d903068b6cd069cd909d12f6f2af056b /subvertpy
parentd0930fe3cc3e3d349631650bb1da9c01134c5e95 (diff)
Fix style issues.
Diffstat (limited to 'subvertpy')
-rw-r--r--subvertpy/__init__.py13
-rw-r--r--subvertpy/delta.py14
-rw-r--r--subvertpy/marshall.py17
-rw-r--r--subvertpy/properties.py43
-rw-r--r--subvertpy/ra.py13
-rw-r--r--subvertpy/ra_svn.py178
-rw-r--r--subvertpy/server.py6
-rw-r--r--subvertpy/tests/__init__.py48
-rw-r--r--subvertpy/tests/test_client.py19
-rw-r--r--subvertpy/tests/test_core.py6
-rw-r--r--subvertpy/tests/test_delta.py12
-rw-r--r--subvertpy/tests/test_marshall.py12
-rw-r--r--subvertpy/tests/test_properties.py151
-rw-r--r--subvertpy/tests/test_ra.py139
-rw-r--r--subvertpy/tests/test_repos.py31
-rw-r--r--subvertpy/tests/test_server.py6
-rw-r--r--subvertpy/tests/test_wc.py82
17 files changed, 485 insertions, 305 deletions
diff --git a/subvertpy/__init__.py b/subvertpy/__init__.py
index b588ca8c..6efcea11 100644
--- a/subvertpy/__init__.py
+++ b/subvertpy/__init__.py
@@ -92,10 +92,10 @@ AUTH_PARAM_DEFAULT_USERNAME = 'svn:auth:username'
AUTH_PARAM_DEFAULT_PASSWORD = 'svn:auth:password'
SSL_NOTYETVALID = 0x00000001
-SSL_EXPIRED = 0x00000002
-SSL_CNMISMATCH = 0x00000004
-SSL_UNKNOWNCA = 0x00000008
-SSL_OTHER = 0x40000000
+SSL_EXPIRED = 0x00000002
+SSL_CNMISMATCH = 0x00000004
+SSL_UNKNOWNCA = 0x00000008
+SSL_OTHER = 0x40000000
class SubversionException(Exception):
@@ -109,7 +109,7 @@ class SubversionException(Exception):
def _check_mtime(m):
"""Check whether a C extension is out of date.
-
+
:param m: Python module that is a C extension
"""
import os
@@ -121,6 +121,7 @@ def _check_mtime(m):
return False
return True
+
try:
from subvertpy import client, _ra, repos, wc
for x in client, _ra, repos, wc:
@@ -130,5 +131,3 @@ try:
break
except ImportError as e:
raise ImportError("Unable to load subvertpy extensions: %s" % e)
-
-
diff --git a/subvertpy/delta.py b/subvertpy/delta.py
index 9dbf7b58..0e354490 100644
--- a/subvertpy/delta.py
+++ b/subvertpy/delta.py
@@ -32,6 +32,7 @@ MAX_ENCODED_INT_LEN = 10
DELTA_WINDOW_SIZE = 102400
+
def apply_txdelta_window(sbuf, window):
"""Apply a txdelta window to a buffer.
@@ -60,9 +61,10 @@ def apply_txdelta_handler_chunks(source_chunks, target_chunks):
:param target_stream: Target stream
"""
sbuf = bytes().join(source_chunks)
+
def apply_window(window):
if window is None:
- return # Last call
+ return # Last call
target_chunks.append(apply_txdelta_window(sbuf, window))
return apply_window
@@ -75,7 +77,7 @@ def apply_txdelta_handler(sbuf, target_stream):
"""
def apply_window(window):
if window is None:
- return # Last call
+ return # Last call
target_stream.write(apply_txdelta_window(sbuf, window))
return apply_window
@@ -136,16 +138,16 @@ def encode_length(len):
# Count number of required bytes
v = len >> 7
- n = 1;
+ n = 1
while v > 0:
v = v >> 7
- n+=1
+ n += 1
assert n <= MAX_ENCODED_INT_LEN
ret = bytearray()
while n > 0:
- n-=1
+ n -= 1
if n > 0:
cont = 1
else:
@@ -211,6 +213,7 @@ def unpack_svndiff_instruction(text):
SVNDIFF0_HEADER = b"SVN\0"
+
def pack_svndiff0_window(window):
"""Pack an individual window using svndiff0.
@@ -273,4 +276,3 @@ def unpack_svndiff0(text):
newdata = text[:newdata_len]
text = text[newdata_len:]
yield (sview_offset, sview_len, tview_len, len(ops), ops, newdata)
-
diff --git a/subvertpy/marshall.py b/subvertpy/marshall.py
index a5a73154..1970da41 100644
--- a/subvertpy/marshall.py
+++ b/subvertpy/marshall.py
@@ -16,6 +16,7 @@
"""Marshalling for the svn_ra protocol."""
+
class literal:
"""A protocol literal."""
@@ -30,10 +31,10 @@ class literal:
# 1. Syntactic structure
# ----------------------
-#
+#
# The Subversion protocol is specified in terms of the following
# syntactic elements, specified using ABNF [RFC 2234]:
-#
+#
# item = word / number / string / list
# word = ALPHA *(ALPHA / DIGIT / "-") space
# number = 1*DIGIT space
@@ -41,7 +42,8 @@ class literal:
# ; digits give the byte count of the *OCTET portion
# list = "(" space *item ")" space
# space = 1*(SP / LF)
-#
+#
+
class MarshallError(Exception):
"""A Marshall error."""
@@ -53,7 +55,7 @@ class NeedMoreData(MarshallError):
def marshall(x):
"""Marshall a Python data item.
-
+
:param x: Data item
:return: encoded byte string
"""
@@ -69,9 +71,9 @@ def marshall(x):
x = x.encode("utf-8")
return ("%d:" % len(x)).encode("ascii") + x + b" "
elif isinstance(x, bool):
- if x == True:
+ if x is True:
return b"true "
- elif x == False:
+ elif x is False:
return b"false "
raise MarshallError("Unable to marshall type %s" % x)
@@ -85,7 +87,7 @@ def unmarshall(x):
whitespace = frozenset(b'\n ')
if len(x) == 0:
raise NeedMoreData("Not enough data")
- if x[0:1] == b"(": # list follows
+ if x[0:1] == b"(": # list follows
if len(x) <= 1:
raise NeedMoreData("Missing whitespace")
if x[1:2] != b" ":
@@ -143,4 +145,3 @@ def unmarshall(x):
return (x[1:], ret.decode("ascii"))
else:
raise MarshallError("Unexpected character '%c'" % x[0])
-
diff --git a/subvertpy/properties.py b/subvertpy/properties.py
index a86c2331..567f60cc 100644
--- a/subvertpy/properties.py
+++ b/subvertpy/properties.py
@@ -1,5 +1,5 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
-
+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
@@ -18,7 +18,10 @@
__author__ = "Jelmer Vernooij <jelmer@jelmer.uk>"
__docformat__ = "restructuredText"
-import bisect, calendar, time
+
+import bisect
+import calendar
+import time
try:
import urlparse
except ImportError:
@@ -38,7 +41,7 @@ def is_valid_property_name(prop):
if not prop[0].isalnum() and not prop[0] in ":_":
return False
for c in prop[1:]:
- if not c.isalnum() and not c in "-:._":
+ if not c.isalnum() and c not in "-:._":
return False
return True
@@ -50,14 +53,15 @@ def time_to_cstring(timestamp):
:return: string with date
"""
tm_usec = timestamp % 1000000
- (tm_year, tm_mon, tm_mday, tm_hour, tm_min,
- tm_sec, tm_wday, tm_yday, tm_isdst) = time.gmtime(timestamp / 1000000)
- return "%04d-%02d-%02dT%02d:%02d:%02d.%06dZ" % (tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, tm_usec)
+ (tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, tm_wday, tm_yday,
+ tm_isdst) = time.gmtime(timestamp / 1000000)
+ return "%04d-%02d-%02dT%02d:%02d:%02d.%06dZ" % (
+ tm_year, tm_mon, tm_mday, tm_hour, tm_min, tm_sec, tm_usec)
def time_from_cstring(text):
"""Parse a time from a cstring.
-
+
:param text: Parse text
:return: number of microseconds since the start of 1970
"""
@@ -71,11 +75,11 @@ def time_from_cstring(text):
def parse_externals_description(base_url, val):
"""Parse an svn:externals property value.
- :param base_url: URL on which the property is set. Used for
+ :param base_url: URL on which the property is set. Used for
relative externals.
:returns: dictionary with local names as keys, (revnum, url)
- as value. revnum is the revision number and is
+ as value. revnum is the revision number and is
set to None if not applicable.
"""
def is_url(u):
@@ -84,24 +88,24 @@ def parse_externals_description(base_url, val):
for l in val.splitlines():
if l == "" or l[0] == "#":
continue
- pts = l.rsplit(None, 3)
+ pts = l.rsplit(None, 3)
if len(pts) == 4:
- if pts[0] == "-r": # -r X URL DIR
+ if pts[0] == "-r": # -r X URL DIR
revno = int(pts[1])
path = pts[3]
relurl = pts[2]
- elif pts[1] == "-r": # DIR -r X URL
+ elif pts[1] == "-r": # DIR -r X URL
revno = int(pts[2])
path = pts[0]
relurl = pts[3]
else:
raise InvalidExternalsDescription()
elif len(pts) == 3:
- if pts[1].startswith("-r"): # DIR -rX URL
+ if pts[1].startswith("-r"): # DIR -rX URL
revno = int(pts[1][2:])
path = pts[0]
relurl = pts[2]
- elif pts[0].startswith("-r"): # -rX URL DIR
+ elif pts[0].startswith("-r"): # -rX URL DIR
revno = int(pts[0][2:])
path = pts[2]
relurl = pts[1]
@@ -118,9 +122,11 @@ def parse_externals_description(base_url, val):
else:
raise InvalidExternalsDescription()
if relurl.startswith("//"):
- raise NotImplementedError("Relative to the scheme externals not yet supported")
+ raise NotImplementedError(
+ "Relative to the scheme externals not yet supported")
if relurl.startswith("^/"):
- raise NotImplementedError("Relative to the repository root externals not yet supported")
+ raise NotImplementedError(
+ "Relative to the repository root externals not yet supported")
ret[path] = (revno, urlparse.urljoin(base_url+"/", relurl))
return ret
@@ -274,13 +280,14 @@ PROP_REVISION_AUTHOR = "svn:author"
PROP_REVISION_DATE = "svn:date"
PROP_REVISION_ORIGINAL_DATE = "svn:original-date"
+
def diff(current, previous):
"""Find the differences between two property dictionaries.
:param current: Dictionary with current (new) properties
:param previous: Dictionary with previous (old) properties
- :return: Dictionary that contains an entry for
- each property that was changed. Value is a tuple
+ :return: Dictionary that contains an entry for
+ each property that was changed. Value is a tuple
with the old and the new property value.
"""
ret = {}
diff --git a/subvertpy/ra.py b/subvertpy/ra.py
index 9963edda..95011702 100644
--- a/subvertpy/ra.py
+++ b/subvertpy/ra.py
@@ -17,11 +17,11 @@
__author__ = "Jelmer Vernooij <jelmer@jelmer.uk>"
-from subvertpy import SubversionException, ERR_BAD_URL
+from subvertpy import SubversionException, ERR_BAD_URL
from subvertpy import _ra
-from subvertpy._ra import *
-from subvertpy import ra_svn
+from subvertpy._ra import * # noqa: F403,F401
+from subvertpy import ra_svn # noqa: F401
try:
from urllib2 import splittype
@@ -30,14 +30,15 @@ except ImportError:
url_handlers = {
"svn": _ra.RemoteAccess,
-# "svn": ra_svn.Client,
+ # "svn": ra_svn.Client,
"svn+ssh": _ra.RemoteAccess,
-# "svn+ssh": ra_svn.Client,
+ # "svn+ssh": ra_svn.Client,
"http": _ra.RemoteAccess,
"https": _ra.RemoteAccess,
"file": _ra.RemoteAccess,
}
+
def RemoteAccess(url, *args, **kwargs):
"""Connect to a remote Subversion server
@@ -47,6 +48,6 @@ def RemoteAccess(url, *args, **kwargs):
if isinstance(url, bytes):
url = url.decode("utf-8")
(type, opaque) = splittype(url)
- if not type in url_handlers:
+ if type not in url_handlers:
raise SubversionException("Unknown URL type '%s'" % type, ERR_BAD_URL)
return url_handlers[type](url, *args, **kwargs)
diff --git a/subvertpy/ra_svn.py b/subvertpy/ra_svn.py
index 7b92bfeb..3e61e283 100644
--- a/subvertpy/ra_svn.py
+++ b/subvertpy/ra_svn.py
@@ -122,7 +122,7 @@ class SVNConnection(object):
except NeedMoreData:
newdata = self.recv_fn(1)
if newdata != "":
- #self.mutter("IN: %r" % newdata)
+ # self.mutter("IN: %r" % newdata)
self.inbuffer += newdata
def send_msg(self, data):
@@ -158,7 +158,8 @@ def feed_editor(conn, editor):
if len(args[3]) == 0:
token = tokens[args[1]].add_directory(args[0])
else:
- token = tokens[args[1]].add_directory(args[0], args[3][0], args[4][0])
+ token = tokens[args[1]].add_directory(
+ args[0], args[3][0], args[4][0])
tokens[args[2]] = token
elif command == "open-dir":
tokens[args[2]] = tokens[args[1]].open_directory(args[0], args[3])
@@ -175,15 +176,18 @@ def feed_editor(conn, editor):
if len(args[3]) == 0:
token = tokens[args[1]].add_file(args[0])
else:
- token = tokens[args[1]].add_file(args[0], args[3][0], args[4][0])
+ token = tokens[args[1]].add_file(
+ args[0], args[3][0], args[4][0])
tokens[args[2]] = token
elif command == "open-file":
tokens[args[2]] = tokens[args[1]].open_file(args[0], args[3])
elif command == "apply-textdelta":
if len(args[1]) == 0:
- txdelta_handler[args[0]] = tokens[args[0]].apply_textdelta(None)
+ txdelta_handler[args[0]] = tokens[args[0]].apply_textdelta(
+ None)
else:
- txdelta_handler[args[0]] = tokens[args[0]].apply_textdelta(args[1][0])
+ txdelta_handler[args[0]] = tokens[args[0]].apply_textdelta(
+ args[1][0])
diff[args[0]] = ""
elif command == "textdelta-chunk":
diff[args[0]] += args[1]
@@ -220,7 +224,7 @@ class Reporter(object):
self.conn = conn
self.editor = editor
- def set_path(self, path, rev, start_empty=False, lock_token=None,
+ def set_path(self, path, rev, start_empty=False, lock_token=None,
depth=None):
args = [path, rev, start_empty]
if lock_token is not None:
@@ -235,8 +239,8 @@ class Reporter(object):
def delete_path(self, path):
self.conn.send_msg([literal("delete-path"), [path]])
- def link_path(self, path, url, rev, start_empty=False, lock_token=None,
- depth=None):
+ def link_path(self, path, url, rev, start_empty=False, lock_token=None,
+ depth=None):
args = [path, url, rev, start_empty]
if lock_token is not None:
args.append([lock_token])
@@ -249,7 +253,7 @@ class Reporter(object):
def finish(self):
self.conn.send_msg([literal("finish-report"), []])
- auth = self.conn.recv_msg()
+ self.conn.recv_msg()
feed_editor(self.conn, self.editor)
self.conn.busy = False
@@ -301,18 +305,21 @@ class DirectoryEditor(object):
copyfrom_data = [copyfrom_path, copyfrom_rev]
else:
copyfrom_data = []
- self.conn.send_msg([literal("add-file"), [path, self.id, child, copyfrom_data]])
+ self.conn.send_msg([literal("add-file"),
+ [path, self.id, child, copyfrom_data]])
return FileEditor(self.conn, child)
def open_file(self, path, base_revnum):
self._is_last_open()
child = generate_random_id()
- self.conn.send_msg([literal("open-file"), [path, self.id, child, base_revnum]])
+ self.conn.send_msg([literal("open-file"),
+ [path, self.id, child, base_revnum]])
return FileEditor(self.conn, child)
def delete_entry(self, path, base_revnum):
self._is_last_open()
- self.conn.send_msg([literal("delete-entry"), [path, base_revnum, self.id]])
+ self.conn.send_msg([literal("delete-entry"),
+ [path, base_revnum, self.id]])
def add_directory(self, path, copyfrom_path=None, copyfrom_rev=-1):
self._is_last_open()
@@ -321,13 +328,15 @@ class DirectoryEditor(object):
copyfrom_data = [copyfrom_path, copyfrom_rev]
else:
copyfrom_data = []
- self.conn.send_msg([literal("add-dir"), [path, self.id, child, copyfrom_data]])
+ self.conn.send_msg([literal("add-dir"),
+ [path, self.id, child, copyfrom_data]])
return DirectoryEditor(self.conn, child)
def open_directory(self, path, base_revnum):
self._is_last_open()
child = generate_random_id()
- self.conn.send_msg([literal("open-dir"), [path, self.id, child, base_revnum]])
+ self.conn.send_msg([literal("open-dir"),
+ [path, self.id, child, base_revnum]])
return DirectoryEditor(self.conn, child)
def change_prop(self, name, value):
@@ -336,7 +345,8 @@ class DirectoryEditor(object):
value = []
else:
value = [value]
- self.conn.send_msg([literal("change-dir-prop"), [self.id, name, value]])
+ self.conn.send_msg([literal("change-dir-prop"),
+ [self.id, name, value]])
def _is_last_open(self):
assert self.conn._open_ids[-1] == self.id
@@ -374,13 +384,17 @@ class FileEditor(object):
base_check = []
else:
base_check = [base_checksum]
- self.conn.send_msg([literal("apply-textdelta"), [self.id, base_check]])
- self.conn.send_msg([literal("textdelta-chunk"), [self.id, SVNDIFF0_HEADER]])
+ self.conn.send_msg([literal("apply-textdelta"),
+ [self.id, base_check]])
+ self.conn.send_msg([literal("textdelta-chunk"),
+ [self.id, SVNDIFF0_HEADER]])
+
def send_textdelta(delta):
if delta is None:
self.conn.send_msg([literal("textdelta-end"), [self.id]])
else:
- self.conn.send_msg([literal("textdelta-chunk"), [self.id, pack_svndiff0_window(delta)]])
+ self.conn.send_msg([literal("textdelta-chunk"),
+ [self.id, pack_svndiff0_window(delta)]])
return send_textdelta
def change_prop(self, name, value):
@@ -389,11 +403,12 @@ class FileEditor(object):
value = []
else:
value = [value]
- self.conn.send_msg([literal("change-file-prop"), [self.id, name, value]])
+ self.conn.send_msg([literal("change-file-prop"),
+ [self.id, name, value]])
def mark_busy(unbound):
-
+
def convert(self, *args, **kwargs):
self.busy = True
try:
@@ -406,6 +421,7 @@ def mark_busy(unbound):
convert.__name__ = unbound.__name__
return convert
+
def unmarshall_dirent(d):
ret = {
"name": d[0],
@@ -423,7 +439,7 @@ def unmarshall_dirent(d):
class SVNClient(SVNConnection):
- def __init__(self, url, progress_cb=None, auth=None, config=None,
+ def __init__(self, url, progress_cb=None, auth=None, config=None,
client_string_func=None, open_tmp_file_func=None):
self.url = url
(type, opaque) = urlparse.splittype(url)
@@ -439,12 +455,19 @@ class SVNClient(SVNConnection):
else:
(recv_func, send_func) = self._connect_ssh(host)
super(SVNClient, self).__init__(recv_func, send_func)
- (min_version, max_version, _, self._server_capabilities) = self._recv_greeting()
- self.send_msg([max_version, [literal(x) for x in CAPABILITIES if x in self._server_capabilities], self.url])
+ (min_version, max_version, _, self._server_capabilities) = (
+ self._recv_greeting())
+ self.send_msg(
+ [max_version,
+ [literal(x) for x in CAPABILITIES
+ if x in self._server_capabilities],
+ self.url])
(self._server_mechanisms, mech_arg) = self._unpack()
if self._server_mechanisms != []:
# FIXME: Support other mechanisms as well
- self.send_msg([literal("ANONYMOUS"), [base64.b64encode("anonymous@%s" % socket.gethostname())]])
+ self.send_msg([literal("ANONYMOUS"),
+ [base64.b64encode(
+ "anonymous@%s" % socket.gethostname())]])
self.recv_msg()
msg = self._unpack()
if len(msg) > 2:
@@ -475,9 +498,11 @@ class SVNClient(SVNConnection):
def _connect(self, host):
(host, port) = urlparse.splitnport(host, SVN_PORT)
- sockaddrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+ sockaddrs = socket.getaddrinfo(
+ host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, 0)
self._socket = None
+ err = RuntimeError('no addresses for %s:%s' % (host, port))
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
try:
self._socket = socket.socket(family, socktype, proto)
@@ -500,7 +525,8 @@ class SVNClient(SVNConnection):
else:
password = None
(host, port) = urlparse.splitnport(host, 22)
- self._tunnel = get_ssh_vendor().connect_ssh(user, password, host, port, ["svnserve", "-t"])
+ self._tunnel = get_ssh_vendor().connect_ssh(
+ user, password, host, port, ["svnserve", "-t"])
return (self._tunnel.recv, self._tunnel.send)
def get_file_revs(self, path, start, end, file_rev_handler):
@@ -508,7 +534,8 @@ class SVNClient(SVNConnection):
@mark_busy
def get_locations(self, path, peg_revision, location_revisions):
- self.send_msg([literal("get-locations"), [path, peg_revision, location_revisions]])
+ self.send_msg([literal("get-locations"), [path, peg_revision,
+ location_revisions]])
self._recv_ack()
ret = {}
while True:
@@ -530,10 +557,11 @@ class SVNClient(SVNConnection):
def unlock(self, path_tokens, break_lock, lock_func):
raise NotImplementedError(self.unlock)
- def mergeinfo(self, paths, revision=-1, inherit=None, include_descendants=False):
+ def mergeinfo(self, paths, revision=-1, inherit=None,
+ include_descendants=False):
raise NotImplementedError(self.mergeinfo)
- def location_segments(self, path, start_revision, end_revision,
+ def location_segments(self, path, start_revision, end_revision,
include_merged_revisions=False):
args = [path]
if start_revision is None or start_revision == -1:
@@ -571,7 +599,8 @@ class SVNClient(SVNConnection):
self.send_msg([literal("check-path"), args])
self._recv_ack()
ret = self._unpack()[0]
- return {"dir": NODE_DIR, "file": NODE_FILE, "unknown": NODE_UNKNOWN, "none": NODE_NONE}[ret]
+ return {"dir": NODE_DIR, "file": NODE_FILE, "unknown": NODE_UNKNOWN,
+ "none": NODE_NONE}[ret]
def get_lock(self, path):
self.send_msg([literal("get-lock"), [path]])
@@ -583,13 +612,14 @@ class SVNClient(SVNConnection):
return ret[0]
@mark_busy
- def get_dir(self, path, revision=-1, dirent_fields=0, want_props=True, want_contents=True):
+ def get_dir(self, path, revision=-1, dirent_fields=0, want_props=True,
+ want_contents=True):
args = [path]
if revision is None or revision == -1:
args.append([])
else:
args.append([revision])
-
+
args += [want_props, want_contents]
fields = []
@@ -646,7 +676,7 @@ class SVNClient(SVNConnection):
self._recv_ack()
self._unparse()
- def get_commit_editor(self, revprops, callback=None, lock_tokens=None,
+ def get_commit_editor(self, revprops, callback=None, lock_tokens=None,
keep_locks=False):
args = [revprops[properties.PROP_REVISION_LOG]]
if lock_tokens is not None:
@@ -675,16 +705,19 @@ class SVNClient(SVNConnection):
return ret[0]
@mark_busy
- def replay(self, revision, low_water_mark, update_editor, send_deltas=True):
- self.send_msg([literal("replay"), [revision, low_water_mark, send_deltas]])
+ def replay(self, revision, low_water_mark, update_editor,
+ send_deltas=True):
+ self.send_msg([literal("replay"), [revision, low_water_mark,
+ send_deltas]])
self._recv_ack()
feed_editor(self, update_editor)
self._unpack()
@mark_busy
- def replay_range(self, start_revision, end_revision, low_water_mark, cbs,
+ def replay_range(self, start_revision, end_revision, low_water_mark, cbs,
send_deltas=True):
- self.send_msg([literal("replay-range"), [start_revision, end_revision, low_water_mark, send_deltas]])
+ self.send_msg([literal("replay-range"), [start_revision, end_revision,
+ low_water_mark, send_deltas]])
self._recv_ack()
for i in range(start_revision, end_revision+1):
msg = self.recv_msg()
@@ -694,7 +727,7 @@ class SVNClient(SVNConnection):
cbs[1](i, dict(msg[1]), edit)
self._unpack()
- def do_switch(self, revision_to_update_to, update_target, recurse,
+ def do_switch(self, revision_to_update_to, update_target, recurse,
switch_url, update_editor, depth=None):
args = []
if revision_to_update_to is None or revision_to_update_to == -1:
@@ -716,7 +749,7 @@ class SVNClient(SVNConnection):
self.busy = False
raise
- def do_update(self, revision_to_update_to, update_target, recurse,
+ def do_update(self, revision_to_update_to, update_target, recurse,
update_editor, depth=None):
args = []
if revision_to_update_to is None or revision_to_update_to == -1:
@@ -738,13 +771,15 @@ class SVNClient(SVNConnection):
raise
def do_diff(self, revision_to_update, diff_target, versus_url, diff_editor,
- recurse=True, ignore_ancestry=False, text_deltas=False, depth=None):
+ recurse=True, ignore_ancestry=False, text_deltas=False,
+ depth=None):
args = []
if revision_to_update is None or revision_to_update == -1:
args.append([])
else:
args.append([revision_to_update])
- args += [diff_target, recurse, ignore_ancestry, versus_url, text_deltas]
+ args += [diff_target, recurse, ignore_ancestry, versus_url,
+ text_deltas]
if depth is not None:
args.append(literal(depth))
self.busy = True
@@ -782,9 +817,9 @@ class SVNClient(SVNConnection):
return self._uuid
@mark_busy
- def log(self, paths, start, end, limit=0,
- discover_changed_paths=True, strict_node_history=True,
- include_merged_revisions=True, revprops=None):
+ def log(self, paths, start, end, limit=0, discover_changed_paths=True,
+ strict_node_history=True, include_merged_revisions=True,
+ revprops=None):
args = [paths]
if start is None or start == -1:
args.append([])
@@ -825,7 +860,8 @@ class SVNClient(SVNConnection):
if len(msg) > 6 and msg[6]:
revno = None
else:
- revno = msg[1]
+ revno = msg[1] # noqa: F841
+ # TODO(jelmer): Do something with revno
revprops = {}
if len(msg[2]) != 0:
revprops[properties.PROP_REVISION_AUTHOR] = msg[2][0]
@@ -845,7 +881,7 @@ class SVNClient(SVNConnection):
callback(paths, rev, props)
else:
callback(paths, rev, props, has_children)
-
+
MIN_VERSION = 2
MAX_VERSION = 2
@@ -861,9 +897,8 @@ class SVNServer(SVNConnection):
self._logf = logf
super(SVNServer, self).__init__(recv_fn, send_fn)
- def send_greeting(self):
self.send_success(
- MIN_VERSION, MAX_VERSION, [literal(x) for x in MECHANISMS],
+ MIN_VERSION, MAX_VERSION, [literal(x) for x in MECHANISMS],
[literal(x) for x in CAPABILITIES])
def send_mechs(self):
@@ -876,8 +911,9 @@ class SVNServer(SVNConnection):
self.send_success([], "")
def send_unknown(self, cmd):
- self.send_failure([ERR_RA_SVN_UNKNOWN_CMD,
- "Unknown command '%s'" % cmd, __file__, 52])
+ self.send_failure(
+ [ERR_RA_SVN_UNKNOWN_CMD,
+ "Unknown command '%s'" % cmd, __file__, 52])
def get_latest_rev(self):
self.send_ack()
@@ -890,13 +926,14 @@ class SVNServer(SVNConnection):
revnum = rev[0]
kind = self.repo_backend.check_path(path, revnum)
self.send_ack()
- self.send_success(literal({NODE_NONE: "none",
- NODE_DIR: "dir",
- NODE_FILE: "file",
- NODE_UNKNOWN: "unknown"}[kind]))
-
- def log(self, target_path, start_rev, end_rev, changed_paths,
- strict_node, limit=None, include_merged_revisions=False,
+ self.send_success(literal({
+ NODE_NONE: "none",
+ NODE_DIR: "dir",
+ NODE_FILE: "file",
+ NODE_UNKNOWN: "unknown"}[kind]))
+
+ def log(self, target_path, start_rev, end_rev, changed_paths,
+ strict_node, limit=None, include_merged_revisions=False,
all_revprops=None, revprops=None):
def send_revision(revno, author, date, message, changed_paths=None):
changes = []
@@ -916,14 +953,15 @@ class SVNServer(SVNConnection):
end_revnum = None
else:
end_revnum = end_rev[0]
- self.repo_backend.log(send_revision, target_path, start_revnum,
+ self.repo_backend.log(send_revision, target_path, start_revnum,
end_revnum, changed_paths, strict_node, limit)
self.send_msg(literal("done"))
self.send_success()
def open_backend(self, url):
(rooturl, location) = urlparse.splithost(url)
- self.repo_backend, self.relpath = self.backend.open_repository(location)
+ self.repo_backend, self.relpath = self.backend.open_repository(
+ location)
def reparent(self, parent):
self.open_backend(parent)
@@ -941,7 +979,7 @@ class SVNServer(SVNConnection):
self.send_success([])
else:
args = [dirent["name"], dirent["kind"], dirent["size"],
- dirent["has-props"], dirent["created-rev"]]
+ dirent["has-props"], dirent["created-rev"]]
if "created-date" in dirent:
args.append([dirent["created-date"]])
else:
@@ -953,8 +991,8 @@ class SVNServer(SVNConnection):
self.send_success([args])
def commit(self, logmsg, locks, keep_locks=False, rev_props=None):
- self.send_failure([ERR_UNSUPPORTED_FEATURE,
- "commit not yet supported", __file__, 42])
+ self.send_failure([ERR_UNSUPPORTED_FEATURE,
+ "commit not yet supported", __file__, 42])
def rev_proplist(self, revnum):
self.send_ack()
@@ -977,7 +1015,8 @@ class SVNServer(SVNConnection):
self.send_msg(literal("done"))
self.send_success()
- def update(self, rev, target, recurse, depth=None, send_copyfrom_param=True):
+ def update(self, rev, target, recurse, depth=None,
+ send_copyfrom_param=True):
self.send_ack()
while True:
msg = self.recv_msg()
@@ -997,7 +1036,8 @@ class SVNServer(SVNConnection):
if client_result[0] == "success":
return
else:
- self.mutter("Client reported error during update: %r" % client_result)
+ self.mutter("Client reported error during update: %r" %
+ client_result)
# Needs to be sent back to the client to display
self.send_failure(client_result[1][0])
@@ -1053,7 +1093,7 @@ class SVNServer(SVNConnection):
# Expect:
while not self._stop:
- ( cmd, args ) = self.recv_msg()
+ (cmd, args) = self.recv_msg()
if cmd not in self.commands:
self.mutter("client used unknown command %r" % cmd)
self.send_unknown(cmd)
@@ -1073,11 +1113,12 @@ class TCPSVNRequestHandler(StreamRequestHandler):
def __init__(self, request, client_address, server):
self._server = server
- StreamRequestHandler.__init__(self, request,
- client_address, server)
+ StreamRequestHandler.__init__(
+ self, request, client_address, server)
def handle(self):
- server = SVNServer(self._server._backend, self.rfile.read,
+ server = SVNServer(
+ self._server._backend, self.rfile.read,
self.wfile.write, self._server._logf)
try:
server.serve()
@@ -1096,4 +1137,3 @@ class TCPSVNServer(TCPServer):
self._logf = logf
self._backend = backend
TCPServer.__init__(self, addr, TCPSVNRequestHandler)
-
diff --git a/subvertpy/server.py b/subvertpy/server.py
index e6ab2f4f..02283eab 100644
--- a/subvertpy/server.py
+++ b/subvertpy/server.py
@@ -16,6 +16,7 @@
"""Server backend base classes."""
+
class ServerBackend(object):
"""A server backend."""
@@ -30,7 +31,7 @@ def generate_random_id():
class ServerRepositoryBackend(object):
-
+
def get_uuid(self):
raise NotImplementedError(self.get_uuid)
@@ -60,6 +61,3 @@ class ServerRepositoryBackend(object):
def get_locations(self, path, peg_revnum, revnums):
raise NotImplementedError(self.get_locations)
-
-
-
diff --git a/subvertpy/tests/__init__.py b/subvertpy/tests/__init__.py
index c668bc8e..7fb1e008 100644
--- a/subvertpy/tests/__init__.py
+++ b/subvertpy/tests/__init__.py
@@ -54,8 +54,9 @@ def rmtree_with_readonly(path):
In Windows a read-only file cannot be removed, and shutil.rmtree fails.
"""
def force_rm_handle(remove_path, path, excinfo):
- os.chmod(path, os.stat(path).st_mode | stat.S_IWUSR | stat.S_IWGRP |
- stat.S_IWOTH)
+ os.chmod(
+ path,
+ os.stat(path).st_mode | stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
remove_path(path)
shutil.rmtree(path, onerror=force_rm_handle)
@@ -69,8 +70,9 @@ class TestCase(unittest.TestCase):
def assertIsInstance(self, obj, kls, msg=None):
"""Fail if obj is not an instance of kls"""
if not isinstance(obj, kls):
- if msg is None: msg = "%r is an instance of %s rather than %s" % (
- obj, obj.__class__, kls)
+ if msg is None:
+ msg = "%r is an instance of %s rather than %s" % (
+ obj, obj.__class__, kls)
self.fail(msg)
def assertIs(self, left, right, message=None):
@@ -145,7 +147,8 @@ class TestDirEditor(object):
def open_dir(self, path):
self.close_children()
- child = TestDirEditor(self.dir.open_directory(path, -1), self.baseurl, self.revnum)
+ child = TestDirEditor(self.dir.open_directory(path, -1), self.baseurl,
+ self.revnum)
self.children.append(child)
return child
@@ -164,7 +167,7 @@ class TestDirEditor(object):
assert (copyfrom_path is None and copyfrom_rev == -1) or \
(copyfrom_path is not None and copyfrom_rev > -1)
child = TestDirEditor(self.dir.add_directory(path, copyfrom_path,
- copyfrom_rev), self.baseurl, self.revnum)
+ copyfrom_rev), self.baseurl, self.revnum)
self.children.append(child)
return child
@@ -175,7 +178,7 @@ class TestDirEditor(object):
if copyfrom_path is not None and copyfrom_rev == -1:
copyfrom_rev = self.revnum
child = TestFileEditor(self.dir.add_file(path, copyfrom_path,
- copyfrom_rev))
+ copyfrom_rev))
self.children.append(child)
return child
@@ -208,7 +211,7 @@ class SubversionTestCase(TestCaseInTempDir):
ra.get_ssl_client_cert_pw_file_provider(),
ra.get_ssl_server_trust_file_provider()])
self.client_ctx.log_msg_func = self.log_message_func
- #self.client_ctx.notify_func = lambda err: mutter("Error: %s" % err)
+ # self.client_ctx.notify_func = lambda err: mutter("Error: %s" % err)
def setUp(self):
super(SubversionTestCase, self).setUp()
@@ -232,16 +235,16 @@ class SubversionTestCase(TestCaseInTempDir):
if allow_revprop_changes:
if sys.platform == 'win32':
- revprop_hook = os.path.join(abspath, "hooks",
- "pre-revprop-change.bat")
+ revprop_hook = os.path.join(
+ abspath, "hooks", "pre-revprop-change.bat")
f = open(revprop_hook, 'w')
try:
f.write("exit 0\n")
finally:
f.close()
else:
- revprop_hook = os.path.join(abspath, "hooks",
- "pre-revprop-change")
+ revprop_hook = os.path.join(
+ abspath, "hooks", "pre-revprop-change")
f = open(revprop_hook, 'w')
try:
f.write("#!/bin/sh\n")
@@ -254,7 +257,6 @@ class SubversionTestCase(TestCaseInTempDir):
else:
return "file://%s" % abspath
-
def make_checkout(self, repos_url, relpath):
"""Create a new checkout."""
self.client_ctx.checkout(repos_url, relpath, "HEAD")
@@ -334,15 +336,17 @@ class SubversionTestCase(TestCaseInTempDir):
r = ra.RemoteAccess(url)
assert isinstance(url, str)
ret = {}
+
def rcvr(orig_paths, rev, revprops, has_children=None):
- ret[rev] = (orig_paths,
+ ret[rev] = (
+ orig_paths,
revprops.get(properties.PROP_REVISION_AUTHOR),
revprops.get(properties.PROP_REVISION_DATE),
revprops.get(properties.PROP_REVISION_LOG))
r.get_log(rcvr, [""], start_revnum, stop_revnum, 0, True, True,
revprops=[properties.PROP_REVISION_AUTHOR,
- properties.PROP_REVISION_DATE,
- properties.PROP_REVISION_LOG])
+ properties.PROP_REVISION_DATE,
+ properties.PROP_REVISION_LOG])
return ret
def client_delete(self, relpath):
@@ -402,8 +406,8 @@ class SubversionTestCase(TestCaseInTempDir):
:param clientpath: Path to checkout
:return: Repository URL.
"""
- repos_url = self.make_repository(repospath,
- allow_revprop_changes=allow_revprop_changes)
+ repos_url = self.make_repository(
+ repospath, allow_revprop_changes=allow_revprop_changes)
self.make_checkout(repos_url, clientpath)
return repos_url
@@ -421,11 +425,11 @@ class SubversionTestCase(TestCaseInTempDir):
:param message: Commit message
:return: Commit editor object
"""
- ra_ctx = RemoteAccess(url.encode("utf-8"),
- auth=Auth([ra.get_username_provider()]))
+ ra_ctx = RemoteAccess(
+ url.encode("utf-8"), auth=Auth([ra.get_username_provider()]))
revnum = ra_ctx.get_latest_revnum()
- return TestCommitEditor(ra_ctx.get_commit_editor({"svn:log": message}),
- ra_ctx.url, revnum)
+ return TestCommitEditor(ra_ctx.get_commit_editor(
+ {"svn:log": message}), ra_ctx.url, revnum)
def test_suite():
diff --git a/subvertpy/tests/test_client.py b/subvertpy/tests/test_client.py
index f7566422..a1a0534d 100644
--- a/subvertpy/tests/test_client.py
+++ b/subvertpy/tests/test_client.py
@@ -74,8 +74,9 @@ class TestClient(SubversionTestCase):
def test_commit_start(self):
self.build_tree({"dc/foo": None})
- self.client = client.Client(auth=ra.Auth([ra.get_username_provider()]),
- log_msg_func=lambda c: "Bmessage")
+ self.client = client.Client(
+ auth=ra.Auth([ra.get_username_provider()]),
+ log_msg_func=lambda c: "Bmessage")
self.client.add("dc/foo")
self.client.commit(["dc"])
r = ra.RemoteAccess(self.repos_url)
@@ -98,7 +99,8 @@ class TestClient(SubversionTestCase):
self.build_tree({"dc/foo": b"bla"})
self.client.add("dc/foo")
self.client.commit(["dc"])
- self.client.export(self.repos_url, "de", ignore_externals=True, ignore_keywords=True)
+ self.client.export(self.repos_url, "de", ignore_externals=True,
+ ignore_keywords=True)
self.assertEqual(["foo"], os.listdir("de"))
def test_add_recursive(self):
@@ -135,8 +137,6 @@ class TestClient(SubversionTestCase):
shutil.rmtree(base_dir)
def test_diff(self):
- r = ra.RemoteAccess(self.repos_url,
- auth=ra.Auth([ra.get_username_provider()]))
dc = self.get_commit_editor(self.repos_url)
f = dc.add_file("foo")
f.modify(b"foo1")
@@ -148,9 +148,10 @@ class TestClient(SubversionTestCase):
dc.close()
if client.api_version() < (1, 5):
- self.assertRaises(NotImplementedError, self.client.diff, 1, 2,
+ self.assertRaises(
+ NotImplementedError, self.client.diff, 1, 2,
self.repos_url, self.repos_url)
- return # Skip test
+ return # Skip test
(outf, errf) = self.client.diff(1, 2, self.repos_url, self.repos_url)
self.addCleanup(outf.close)
@@ -203,6 +204,7 @@ class TestClient(SubversionTestCase):
commit_msg_1 = b"Commit"
commit_msg_2 = b"Commit 2"
delta = timedelta(hours=1)
+
def cb(changed_paths, revision, revprops, has_children=False):
log_entries.append({
'changed_paths': changed_paths,
@@ -241,7 +243,8 @@ class TestClient(SubversionTestCase):
self.assertLogEntryMessageEquals(commit_msg_1, log_entries[1])
self.assertLogEntryDateAlmostEquals(commit_1_dt, log_entries[1], delta)
log_entries = []
- self.client.log(cb, "dc/foo", start_rev=2, end_rev=2, discover_changed_paths=True)
+ self.client.log(cb, "dc/foo", start_rev=2, end_rev=2,
+ discover_changed_paths=True)
self.assertEqual(1, len(log_entries))
self.assertLogEntryChangedPathsEquals(["/foo", "/bar"], log_entries[0])
self.assertEqual(2, log_entries[0]["revision"])
diff --git a/subvertpy/tests/test_core.py b/subvertpy/tests/test_core.py
index fda776c6..f608aca3 100644
--- a/subvertpy/tests/test_core.py
+++ b/subvertpy/tests/test_core.py
@@ -1,5 +1,5 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
-
+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
@@ -18,10 +18,12 @@
import subvertpy
from subvertpy.tests import TestCase
+
class TestCore(TestCase):
def setUp(self):
super(TestCore, self).setUp()
def test_exc(self):
- self.assertTrue(isinstance(subvertpy.SubversionException("foo", 1), Exception))
+ self.assertTrue(
+ isinstance(subvertpy.SubversionException("foo", 1), Exception))
diff --git a/subvertpy/tests/test_delta.py b/subvertpy/tests/test_delta.py
index 4dbec0dd..20853d1e 100644
--- a/subvertpy/tests/test_delta.py
+++ b/subvertpy/tests/test_delta.py
@@ -1,5 +1,5 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
-
+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
@@ -28,6 +28,7 @@ from subvertpy.delta import (
)
from subvertpy.tests import TestCase
+
class DeltaTests(TestCase):
def setUp(self):
@@ -40,8 +41,8 @@ class DeltaTests(TestCase):
def test_send_stream(self):
stream = BytesIO(b"foo")
send_stream(stream, self.storing_window_handler)
- self.assertEqual([(0, 0, 3, 0, [(2, 0, 3)], b'foo'), None],
- self.windows)
+ self.assertEqual([(0, 0, 3, 0, [(2, 0, 3)], b'foo'), None],
+ self.windows)
def test_apply_delta(self):
stream = BytesIO()
@@ -74,7 +75,8 @@ class MarshallTests(TestCase):
def test_roundtrip_length(self):
self.assertEqual((42, bytes()), decode_length(encode_length(42)))
-
def test_roundtrip_window(self):
mywindow = (0, 0, 3, 1, [(2, 0, 3)], b'foo')
- self.assertEqual([mywindow], list(unpack_svndiff0(pack_svndiff0([mywindow]))))
+ self.assertEqual(
+ [mywindow],
+ list(unpack_svndiff0(pack_svndiff0([mywindow]))))
diff --git a/subvertpy/tests/test_marshall.py b/subvertpy/tests/test_marshall.py
index 5ccaa980..751276f2 100644
--- a/subvertpy/tests/test_marshall.py
+++ b/subvertpy/tests/test_marshall.py
@@ -24,6 +24,7 @@ from subvertpy.marshall import (
)
from subvertpy.tests import TestCase
+
class TestMarshalling(TestCase):
def test_literal_txt(self):
@@ -41,15 +42,15 @@ class TestMarshalling(TestCase):
def test_marshall_error(self):
e = MarshallError("bla bla")
self.assertEqual("bla bla", e.__str__())
-
+
def test_marshall_int(self):
self.assertEqual(b"1 ", marshall(1))
def test_marshall_list(self):
- self.assertEqual(b"( 1 2 3 4 ) ", marshall([1,2,3,4]))
-
+ self.assertEqual(b"( 1 2 3 4 ) ", marshall([1, 2, 3, 4]))
+
def test_marshall_list_mixed(self):
- self.assertEqual(b"( 1 3 4 3:str ) ", marshall([1,3,4,"str"]))
+ self.assertEqual(b"( 1 3 4 3:str ) ", marshall([1, 3, 4, "str"]))
def test_marshall_literal(self):
self.assertEqual(b"foo ", marshall(literal("foo")))
@@ -87,9 +88,8 @@ class TestMarshalling(TestCase):
def test_unmarshall_toolong(self):
self.assertRaises(MarshallError, unmarshall, b"43432432:bla")
- def test_unmarshall_literal(self):
+ def test_unmarshall_literal_negative(self):
self.assertRaises(MarshallError, unmarshall, b":-3213")
def test_unmarshall_open_list(self):
self.assertRaises(MarshallError, unmarshall, b"( 3 4 ")
-
diff --git a/subvertpy/tests/test_properties.py b/subvertpy/tests/test_properties.py
index 5e3aea44..996f03a3 100644
--- a/subvertpy/tests/test_properties.py
+++ b/subvertpy/tests/test_properties.py
@@ -23,13 +23,16 @@ from subvertpy.tests import (
TestCase,
)
+
class TestProperties(TestCase):
def setUp(self):
super(TestProperties, self).setUp()
def test_time_from_cstring(self):
- self.assertEqual(1225704780716938, properties.time_from_cstring("2008-11-03T09:33:00.716938Z"))
+ self.assertEqual(
+ 1225704780716938,
+ properties.time_from_cstring("2008-11-03T09:33:00.716938Z"))
def test_time_from_cstring_independent_from_dst(self):
old_tz = os.environ.get('TZ', None)
@@ -38,11 +41,14 @@ class TestProperties(TestCase):
raise SkipTest("tzset not available on Windows")
try:
- # First specify a fixed timezone with known DST (late March to late October)
+ # First specify a fixed timezone with known DST (late March to late
+ # October)
os.environ['TZ'] = 'Europe/London'
time.tzset()
# Now test a time within that DST
- self.assertEqual(1275295762430000, properties.time_from_cstring("2010-05-31T08:49:22.430000Z"))
+ self.assertEqual(
+ 1275295762430000,
+ properties.time_from_cstring("2010-05-31T08:49:22.430000Z"))
finally:
if old_tz is None:
del os.environ['TZ']
@@ -51,46 +57,56 @@ class TestProperties(TestCase):
time.tzset()
def test_time_to_cstring(self):
- self.assertEqual("2008-11-03T09:33:00.716938Z", properties.time_to_cstring(1225704780716938))
+ self.assertEqual(
+ "2008-11-03T09:33:00.716938Z",
+ properties.time_to_cstring(1225704780716938))
class TestExternalsParser(TestCase):
def test_parse_root_relative_externals(self):
- self.assertRaises(NotImplementedError, properties.parse_externals_description,
- "http://example.com", "third-party/skins ^/foo")
+ self.assertRaises(
+ NotImplementedError,
+ properties.parse_externals_description,
+ "http://example.com", "third-party/skins ^/foo")
def test_parse_scheme_relative_externals(self):
- self.assertRaises(NotImplementedError, properties.parse_externals_description,
- "http://example.com", "third-party/skins //foo")
+ self.assertRaises(
+ NotImplementedError, properties.parse_externals_description,
+ "http://example.com", "third-party/skins //foo")
def test_parse_externals(self):
self.assertEqual({
- 'third-party/sounds': (None, "http://sounds.red-bean.com/repos"),
- 'third-party/skins': (None, "http://skins.red-bean.com/repositories/skinproj"),
- 'third-party/skins/toolkit': (21, "http://svn.red-bean.com/repos/skin-maker")},
- properties.parse_externals_description("http://example.com",
-"""third-party/sounds http://sounds.red-bean.com/repos
-third-party/skins http://skins.red-bean.com/repositories/skinproj
+ 'third-party/sounds':
+ (None, "http://sounds.red-bean.com/repos"),
+ 'third-party/skins':
+ (None, "http://skins.red-bean.com/repositories/skinproj"),
+ 'third-party/skins/toolkit':
+ (21, "http://svn.red-bean.com/repos/skin-maker")},
+ properties.parse_externals_description(
+ "http://example.com", """\
+third-party/sounds http://sounds.red-bean.com/repos
+third-party/skins http://skins.red-bean.com/repositories/skinproj
third-party/skins/toolkit -r21 http://svn.red-bean.com/repos/skin-maker"""))
def test_parse_externals_space_revno(self):
self.assertEqual({
- 'third-party/skins/toolkit': (21, "http://svn.red-bean.com/repos/skin-maker")},
- properties.parse_externals_description("http://example.com",
-"""third-party/skins/toolkit -r 21 http://svn.red-bean.com/repos/skin-maker"""))
+ 'third-party/skins/toolkit':
+ (21, "http://svn.red-bean.com/repos/skin-maker")},
+ properties.parse_externals_description("http://example.com", """\
+third-party/skins/toolkit -r 21 http://svn.red-bean.com/repos/skin-maker"""))
def test_parse_externals_swapped(self):
- self.assertEqual({'third-party/sounds': (None, "http://sounds.red-bean.com/repos")},
- properties.parse_externals_description("http://example.com",
-"""http://sounds.red-bean.com/repos third-party/sounds
+ self.assertEqual(
+ {'third-party/sounds': (None, "http://sounds.red-bean.com/repos")},
+ properties.parse_externals_description("http://example.com", """\
+http://sounds.red-bean.com/repos third-party/sounds
"""))
def test_parse_comment(self):
self.assertEqual({
'third-party/sounds': (None, "http://sounds.red-bean.com/repos")
},
- properties.parse_externals_description("http://example.com/",
-"""
+ properties.parse_externals_description("http://example.com/", """\
third-party/sounds http://sounds.red-bean.com/repos
#third-party/skins http://skins.red-bean.com/repositories/skinproj
@@ -100,36 +116,49 @@ third-party/sounds http://sounds.red-bean.com/repos
self.assertEqual({
'third-party/sounds': (None, "http://example.com/branches/other"),
},
- properties.parse_externals_description("http://example.com/trunk",
-"third-party/sounds ../branches/other"))
+ properties.parse_externals_description(
+ "http://example.com/trunk",
+ "third-party/sounds ../branches/other"))
def test_parse_repos_root_relative(self):
self.assertEqual({
- 'third-party/sounds': (None, "http://example.com/bar/bla/branches/other"),
+ 'third-party/sounds':
+ (None, "http://example.com/bar/bla/branches/other"),
},
- properties.parse_externals_description("http://example.com/trunk",
-"third-party/sounds /bar/bla/branches/other"))
+ properties.parse_externals_description(
+ "http://example.com/trunk",
+ "third-party/sounds /bar/bla/branches/other"))
def test_parse_invalid_missing_url(self):
"""No URL specified."""
- self.assertRaises(properties.InvalidExternalsDescription,
- lambda: properties.parse_externals_description("http://example.com/", "bla"))
+ self.assertRaises(
+ properties.InvalidExternalsDescription,
+ lambda: properties.parse_externals_description(
+ "http://example.com/", "bla"))
def test_parse_invalid_too_much_data(self):
"""No URL specified."""
- self.assertRaises(properties.InvalidExternalsDescription,
- lambda: properties.parse_externals_description(None, "bla -R40 http://bla/"))
+ self.assertRaises(
+ properties.InvalidExternalsDescription,
+ lambda: properties.parse_externals_description(
+ None, "bla -R40 http://bla/"))
class MergeInfoPropertyParserTests(TestCase):
def test_simple_range(self):
- self.assertEqual({"/trunk": [(1, 2, True)]}, properties.parse_mergeinfo_property("/trunk:1-2\n"))
+ self.assertEqual(
+ {"/trunk": [(1, 2, True)]},
+ properties.parse_mergeinfo_property("/trunk:1-2\n"))
def test_simple_range_uninheritable(self):
- self.assertEqual({"/trunk": [(1, 2, False)]}, properties.parse_mergeinfo_property("/trunk:1-2*\n"))
+ self.assertEqual(
+ {"/trunk": [(1, 2, False)]},
+ properties.parse_mergeinfo_property("/trunk:1-2*\n"))
def test_simple_individual(self):
- self.assertEqual({"/trunk": [(1, 1, True)]}, properties.parse_mergeinfo_property("/trunk:1\n"))
+ self.assertEqual(
+ {"/trunk": [(1, 1, True)]},
+ properties.parse_mergeinfo_property("/trunk:1\n"))
def test_empty(self):
self.assertEqual({}, properties.parse_mergeinfo_property(""))
@@ -137,10 +166,15 @@ class MergeInfoPropertyParserTests(TestCase):
class MergeInfoPropertyCreatorTests(TestCase):
def test_simple_range(self):
- self.assertEqual("/trunk:1-2\n", properties.generate_mergeinfo_property({"/trunk": [(1, 2, True)]}))
+ self.assertEqual(
+ "/trunk:1-2\n",
+ properties.generate_mergeinfo_property({"/trunk": [(1, 2, True)]}))
def test_simple_individual(self):
- self.assertEqual("/trunk:1\n", properties.generate_mergeinfo_property({"/trunk": [(1, 1, True)]}))
+ self.assertEqual(
+ "/trunk:1\n",
+ properties.generate_mergeinfo_property(
+ {"/trunk": [(1, 1, True)]}))
def test_empty(self):
self.assertEqual("", properties.generate_mergeinfo_property({}))
@@ -151,39 +185,62 @@ class RevnumRangeTests(TestCase):
self.assertEqual([(1, 1, True)], properties.range_add_revnum([], 1))
def test_add_revnum_before(self):
- self.assertEqual([(2, 2, True), (8, 8, True)], properties.range_add_revnum([(2, 2, True)], 8))
+ self.assertEqual(
+ [(2, 2, True), (8, 8, True)],
+ properties.range_add_revnum([(2, 2, True)], 8))
def test_add_revnum_included(self):
- self.assertEqual([(1, 3, True)], properties.range_add_revnum([(1, 3, True)], 2))
+ self.assertEqual(
+ [(1, 3, True)],
+ properties.range_add_revnum([(1, 3, True)], 2))
def test_add_revnum_after(self):
- self.assertEqual([(1, 3, True), (5, 5, True)], properties.range_add_revnum([(1, 3, True)], 5))
+ self.assertEqual(
+ [(1, 3, True), (5, 5, True)],
+ properties.range_add_revnum([(1, 3, True)], 5))
def test_add_revnum_extend_before(self):
- self.assertEqual([(1, 3, True)], properties.range_add_revnum([(2, 3, True)], 1))
+ self.assertEqual(
+ [(1, 3, True)],
+ properties.range_add_revnum([(2, 3, True)], 1))
def test_add_revnum_extend_after(self):
- self.assertEqual([(1, 3, True)], properties.range_add_revnum([(1, 2, True)], 3))
+ self.assertEqual(
+ [(1, 3, True)],
+ properties.range_add_revnum([(1, 2, True)], 3))
def test_revnum_includes_empty(self):
self.assertFalse(properties.range_includes_revnum([], 2))
def test_revnum_includes_oor(self):
- self.assertFalse(properties.range_includes_revnum([(1, 3, True), (4, 5, True)], 10))
+ self.assertFalse(
+ properties.range_includes_revnum(
+ [(1, 3, True), (4, 5, True)], 10))
def test_revnum_includes_in(self):
- self.assertTrue(properties.range_includes_revnum([(1, 3, True), (4, 5, True)], 2))
+ self.assertTrue(
+ properties.range_includes_revnum(
+ [(1, 3, True), (4, 5, True)], 2))
class MergeInfoIncludeTests(TestCase):
+
def test_includes_individual(self):
- self.assertTrue(properties.mergeinfo_includes_revision({"/trunk": [(1, 1, True)]}, "/trunk", 1))
+ self.assertTrue(
+ properties.mergeinfo_includes_revision(
+ {"/trunk": [(1, 1, True)]}, "/trunk", 1))
def test_includes_range(self):
- self.assertTrue(properties.mergeinfo_includes_revision({"/trunk": [(1, 5, True)]}, "/trunk", 3))
+ self.assertTrue(
+ properties.mergeinfo_includes_revision(
+ {"/trunk": [(1, 5, True)]}, "/trunk", 3))
def test_includes_invalid_path(self):
- self.assertFalse(properties.mergeinfo_includes_revision({"/somepath": [(1, 5, True)]}, "/trunk", 3))
+ self.assertFalse(
+ properties.mergeinfo_includes_revision(
+ {"/somepath": [(1, 5, True)]}, "/trunk", 3))
def test_includes_invalid_revnum(self):
- self.assertFalse(properties.mergeinfo_includes_revision({"/trunk": [(1, 5, True)]}, "/trunk", 30))
+ self.assertFalse(
+ properties.mergeinfo_includes_revision(
+ {"/trunk": [(1, 5, True)]}, "/trunk", 30))
diff --git a/subvertpy/tests/test_ra.py b/subvertpy/tests/test_ra.py
index 5cbefcbb..51ad42b3 100644
--- a/subvertpy/tests/test_ra.py
+++ b/subvertpy/tests/test_ra.py
@@ -1,5 +1,5 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
-
+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
@@ -27,6 +27,7 @@ from subvertpy.tests import (
TestCase,
)
+
class VersionTest(TestCase):
def test_version_length(self):
@@ -50,8 +51,8 @@ class TestRemoteAccess(SubversionTestCase):
def setUp(self):
super(TestRemoteAccess, self).setUp()
self.repos_url = self.make_repository("d")
- self.ra = ra.RemoteAccess(self.repos_url,
- auth=ra.Auth([ra.get_username_provider()]))
+ self.ra = ra.RemoteAccess(
+ self.repos_url, auth=ra.Auth([ra.get_username_provider()]))
def tearDown(self):
del self.ra
@@ -67,7 +68,7 @@ class TestRemoteAccess(SubversionTestCase):
def test_repr(self):
self.assertEqual("RemoteAccess(\"%s\")" % self.repos_url,
- repr(self.ra))
+ repr(self.ra))
def test_latest_revnum(self):
self.assertEqual(0, self.ra.get_latest_revnum())
@@ -93,9 +94,11 @@ class TestRemoteAccess(SubversionTestCase):
def test_has_capability(self):
if ra.api_version() < (1, 5):
- self.assertRaises(NotImplementedError, self.ra.has_capability, "FOO")
+ self.assertRaises(NotImplementedError, self.ra.has_capability,
+ "FOO")
else:
- self.assertRaises(SubversionException, self.ra.has_capability, "FOO")
+ self.assertRaises(SubversionException, self.ra.has_capability,
+ "FOO")
def test_get_dir(self):
ret = self.ra.get_dir("", 0)
@@ -107,7 +110,8 @@ class TestRemoteAccess(SubversionTestCase):
def test_get_dir_kind(self):
self.do_commit()
- (dirents, fetch_rev, props) = self.ra.get_dir("/", 1, fields=ra.DIRENT_KIND)
+ (dirents, fetch_rev, props) = self.ra.get_dir(
+ "/", 1, fields=ra.DIRENT_KIND)
self.assertIsInstance(props, dict)
self.assertEqual(1, fetch_rev)
self.assertEqual(NODE_DIR, dirents["foo"]["kind"])
@@ -123,19 +127,28 @@ class TestRemoteAccess(SubversionTestCase):
self.do_commit()
class MyFileEditor:
- def change_prop(self, name, val): pass
+
+ def change_prop(self, name, val): pass
+
def close(self, checksum=None): pass
class MyDirEditor:
- def change_prop(self, name, val): pass
+
+ def change_prop(self, name, val): pass
+
def add_directory(self, *args): return MyDirEditor()
+
def add_file(self, *args): return MyFileEditor()
+
def close(self): pass
class MyEditor:
- def set_target_revision(self, rev): pass
+
+ def set_target_revision(self, rev): pass
+
def open_root(self, base_rev):
return MyDirEditor()
+
def close(self): pass
reporter = self.ra.do_diff(1, "", self.ra.get_repos_root(), MyEditor())
reporter.set_path("", 0, True)
@@ -145,14 +158,15 @@ class TestRemoteAccess(SubversionTestCase):
def test_iter_log_invalid(self):
self.assertRaises(SubversionException, list, self.ra.iter_log(
- ["idontexist"], 0, 0, revprops=["svn:date", "svn:author", "svn:log"]))
+ ["idontexist"], 0, 0, revprops=[
+ "svn:date", "svn:author", "svn:log"]))
self.assertRaises(SubversionException, list, self.ra.iter_log(
[""], 0, 1000, revprops=["svn:date", "svn:author", "svn:log"]))
def test_iter_log(self):
def check_results(returned):
self.assertEqual(2, len(returned))
- self.assertTrue(len(returned[0]) in (3,4))
+ self.assertTrue(len(returned[0]) in (3, 4))
if len(returned[0]) == 3:
(paths, revnum, props) = returned[0]
else:
@@ -165,27 +179,33 @@ class TestRemoteAccess(SubversionTestCase):
else:
(paths, revnum, props, has_children) = returned[1]
if ra.api_version() < (1, 6):
- self.assertEqual({'/foo': ('A', None, -1, NODE_UNKNOWN)}, paths)
+ self.assertEqual({'/foo': ('A', None, -1, NODE_UNKNOWN)},
+ paths)
else:
self.assertEqual({'/foo': ('A', None, -1, NODE_DIR)}, paths)
self.assertEqual(revnum, 1)
- self.assertEqual(set(["svn:date", "svn:author", "svn:log"]),
- set(props.keys()))
- returned = list(self.ra.iter_log([""], 0, 0,
- revprops=["svn:date", "svn:author", "svn:log"]))
+ self.assertEqual(set(["svn:date", "svn:author", "svn:log"]),
+ set(props.keys()))
+
+ returned = list(self.ra.iter_log(
+ [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"]))
self.assertEqual(1, len(returned))
self.do_commit()
- returned = list(self.ra.iter_log(None, 0, 1, discover_changed_paths=True,
- strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"]))
+ returned = list(self.ra.iter_log(
+ None, 0, 1, discover_changed_paths=True,
+ strict_node_history=False,
+ revprops=["svn:date", "svn:author", "svn:log"]))
check_results(returned)
def test_get_log(self):
returned = []
+
def cb(*args):
returned.append(args)
+
def check_results(returned):
self.assertEqual(2, len(returned))
- self.assertTrue(len(returned[0]) in (3,4))
+ self.assertTrue(len(returned[0]) in (3, 4))
if len(returned[0]) == 3:
(paths, revnum, props) = returned[0]
else:
@@ -199,22 +219,28 @@ class TestRemoteAccess(SubversionTestCase):
(paths, revnum, props, has_children) = returned[1]
self.assertEqual({'/foo': ('A', None, -1)}, paths)
self.assertEqual(revnum, 1)
- self.assertEqual(set(["svn:date", "svn:author", "svn:log"]),
+ self.assertEqual(set(["svn:date", "svn:author", "svn:log"]),
set(props.keys()))
- self.ra.get_log(cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])
+ self.ra.get_log(cb, [""], 0, 0,
+ revprops=["svn:date", "svn:author", "svn:log"])
self.assertEqual(1, len(returned))
self.do_commit()
returned = []
- self.ra.get_log(cb, None, 0, 1, discover_changed_paths=True,
- strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"])
+ self.ra.get_log(cb, None, 0, 1, discover_changed_paths=True,
+ strict_node_history=False,
+ revprops=["svn:date", "svn:author", "svn:log"])
check_results(returned)
def test_get_log_cancel(self):
+
def cb(*args):
raise KeyError
+
self.do_commit()
- self.assertRaises(KeyError,
- self.ra.get_log, cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])
+ self.assertRaises(
+ KeyError,
+ self.ra.get_log, cb, [""], 0, 0,
+ revprops=["svn:date", "svn:author", "svn:log"])
def test_get_commit_editor_double_close(self):
def mycb(*args):
@@ -231,7 +257,8 @@ class TestRemoteAccess(SubversionTestCase):
def mycb(rev):
pass
editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
- self.assertRaises(ra.BusyException, self.ra.get_commit_editor,
+ self.assertRaises(
+ ra.BusyException, self.ra.get_commit_editor,
{"svn:log": "foo"}, mycb)
editor.abort()
@@ -244,11 +271,15 @@ class TestRemoteAccess(SubversionTestCase):
self.assertRaises(RuntimeError, root.add_directory, "foo")
def test_get_commit_editor_custom_revprops(self):
- if ra.version()[:2] < (1,5):
+ if ra.version()[:2] < (1, 5):
return
+
def mycb(paths, rev, revprops):
pass
- editor = self.ra.get_commit_editor({"svn:log": "foo", "bar:foo": "bla", "svn:custom:blie": "bloe"}, mycb)
+
+ editor = self.ra.get_commit_editor(
+ {"svn:log": "foo", "bar:foo": "bla",
+ "svn:custom:blie": "bloe"}, mycb)
root = editor.open_root()
root.add_directory("somedir").close()
root.close()
@@ -256,7 +287,8 @@ class TestRemoteAccess(SubversionTestCase):
revprops = self.ra.rev_proplist(1)
self.assertEqual(
- set(['bar:foo', 'svn:author', 'svn:custom:blie', 'svn:date', 'svn:log']),
+ set(['bar:foo', 'svn:author', 'svn:custom:blie', 'svn:date',
+ 'svn:log']),
set(revprops.keys()), "result: %r" % revprops)
def test_get_commit_editor_context_manager(self):
@@ -357,7 +389,10 @@ class TestRemoteAccess(SubversionTestCase):
cb.close()
ret = self.ra.stat("bar", 1)
- self.assertEqual(set(['last_author', 'kind', 'created_rev', 'has_props', 'time', 'size']), set(ret.keys()))
+ self.assertEqual(
+ set(['last_author', 'kind', 'created_rev', 'has_props', 'time',
+ 'size']),
+ set(ret.keys()))
def test_get_locations_dir(self):
cb = self.commit_editor()
@@ -372,17 +407,21 @@ class TestRemoteAccess(SubversionTestCase):
cb.delete("bar")
cb.close()
- self.assertEqual({1: "/bar", 2: "/bla"},
- self.ra.get_locations("bla", 2, [1,2]))
+ self.assertEqual(
+ {1: "/bar", 2: "/bla"},
+ self.ra.get_locations("bla", 2, [1, 2]))
- self.assertEqual({1: "/bar", 2: "/bar"},
- self.ra.get_locations("bar", 1, [1,2]))
+ self.assertEqual(
+ {1: "/bar", 2: "/bar"},
+ self.ra.get_locations("bar", 1, [1, 2]))
- self.assertEqual({1: "/bar", 2: "/bar"},
- self.ra.get_locations("bar", 2, [1,2]))
+ self.assertEqual(
+ {1: "/bar", 2: "/bar"},
+ self.ra.get_locations("bar", 2, [1, 2]))
- self.assertEqual({1: "/bar", 2: "/bla", 3: "/bla"},
- self.ra.get_locations("bla", 3, [1,2,3]))
+ self.assertEqual(
+ {1: "/bar", 2: "/bla", 3: "/bla"},
+ self.ra.get_locations("bla", 3, [1, 2, 3]))
class AuthTests(TestCase):
@@ -392,47 +431,55 @@ class AuthTests(TestCase):
def test_not_registered(self):
auth = ra.Auth([])
- self.assertRaises(SubversionException, auth.credentials, "svn.simple", "MyRealm")
+ self.assertRaises(
+ SubversionException, auth.credentials, "svn.simple", "MyRealm")
def test_simple(self):
- auth = ra.Auth([ra.get_simple_prompt_provider(lambda realm, uname, may_save: ("foo", "geheim", False), 0)])
+ auth = ra.Auth([ra.get_simple_prompt_provider(
+ lambda realm, uname, may_save: ("foo", "geheim", False), 0)])
creds = auth.credentials("svn.simple", "MyRealm")
self.assertEqual(("foo", "geheim", 0), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_username(self):
- auth = ra.Auth([ra.get_username_prompt_provider(lambda realm, may_save: ("somebody", False), 0)])
+ auth = ra.Auth([ra.get_username_prompt_provider(
+ lambda realm, may_save: ("somebody", False), 0)])
creds = auth.credentials("svn.username", "MyRealm")
self.assertEqual(("somebody", 0), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_client_cert(self):
- auth = ra.Auth([ra.get_ssl_client_cert_prompt_provider(lambda realm, may_save: ("filename", False), 0)])
+ auth = ra.Auth([ra.get_ssl_client_cert_prompt_provider(
+ lambda realm, may_save: ("filename", False), 0)])
creds = auth.credentials("svn.ssl.client-cert", "MyRealm")
self.assertEqual(("filename", False), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_client_cert_pw(self):
- auth = ra.Auth([ra.get_ssl_client_cert_pw_prompt_provider(lambda realm, may_save: ("supergeheim", False), 0)])
+ auth = ra.Auth([ra.get_ssl_client_cert_pw_prompt_provider(
+ lambda realm, may_save: ("supergeheim", False), 0)])
creds = auth.credentials("svn.ssl.client-passphrase", "MyRealm")
self.assertEqual(("supergeheim", False), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_server_trust(self):
- auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: (42, False))])
+ auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(
+ lambda realm, failures, certinfo, may_save: (42, False))])
auth.set_parameter("svn:auth:ssl:failures", 23)
creds = auth.credentials("svn.ssl.server", "MyRealm")
self.assertEqual((42, 0), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_server_untrust(self):
- auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: None)])
+ auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(
+ lambda realm, failures, certinfo, may_save: None)])
auth.set_parameter("svn:auth:ssl:failures", 23)
creds = auth.credentials("svn.ssl.server", "MyRealm")
self.assertRaises(StopIteration, next, creds)
def test_retry(self):
self.i = 0
+
def inc_foo(realm, may_save):
self.i += 1
return ("somebody%d" % self.i, False)
diff --git a/subvertpy/tests/test_repos.py b/subvertpy/tests/test_repos.py
index 21a0f4f1..ed5f2067 100644
--- a/subvertpy/tests/test_repos.py
+++ b/subvertpy/tests/test_repos.py
@@ -1,5 +1,5 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
-
+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
@@ -46,7 +46,8 @@ class TestRepository(TestCaseInTempDir):
def test_capability(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
if repos.api_version() < (1, 5):
- self.assertRaises(NotImplementedError, r.has_capability, "mergeinfo")
+ self.assertRaises(NotImplementedError, r.has_capability,
+ "mergeinfo")
else:
self.assertIsInstance(r.has_capability("mergeinfo"), bool)
@@ -70,13 +71,15 @@ class TestRepository(TestCaseInTempDir):
def test_rev_root(self):
repos.create(os.path.join(self.test_dir, "foo"))
- self.assertTrue(repos.Repository("foo").fs().revision_root(0) is not None)
+ self.assertTrue(
+ repos.Repository("foo").fs().revision_root(0) is not None)
def test_load_fs_invalid(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
dumpfile = b"Malformed"
feedback = BytesIO()
- self.assertRaises(SubversionException, r.load_fs, BytesIO(dumpfile),
+ self.assertRaises(
+ SubversionException, r.load_fs, BytesIO(dumpfile),
feedback, repos.LOAD_UUID_DEFAULT)
def test_load_fs(self):
@@ -98,15 +101,19 @@ class TestRepository(TestCaseInTempDir):
""").encode("ascii")
feedback = BytesIO()
r.load_fs(BytesIO(dumpfile), feedback, repos.LOAD_UUID_DEFAULT)
- self.assertEqual(r.fs().get_uuid(), "38f0a982-fd1f-4e00-aa6b-a20720f4b9ca")
+ self.assertEqual(r.fs().get_uuid(),
+ "38f0a982-fd1f-4e00-aa6b-a20720f4b9ca")
def test_rev_props(self):
repos.create(os.path.join(self.test_dir, "foo"))
- self.assertEqual(["svn:date"], list(repos.Repository("foo").fs().revision_proplist(0).keys()))
+ self.assertEqual(
+ ["svn:date"],
+ list(repos.Repository("foo").fs().revision_proplist(0).keys()))
def test_rev_root_invalid(self):
repos.create(os.path.join(self.test_dir, "foo"))
- self.assertRaises(SubversionException, repos.Repository("foo").fs().revision_root, 1)
+ self.assertRaises(SubversionException,
+ repos.Repository("foo").fs().revision_root, 1)
def test_pack_fs(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
@@ -121,15 +128,17 @@ class TestRepository(TestCaseInTempDir):
repos.create(os.path.join(self.test_dir, "foo"))
root = repos.Repository("foo").fs().revision_root(0)
self.assertEqual(True, root.is_dir(""))
- # TODO(jelmer): Newer versions of libsvn_repos crash when passed a nonexistant path.
- #self.assertEqual(False, root.is_dir("nonexistant"))
+ # TODO(jelmer): Newer versions of libsvn_repos crash when passed a
+ # nonexistant path.
+ # self.assertEqual(False, root.is_dir("nonexistant"))
def test_is_file(self):
repos.create(os.path.join(self.test_dir, "foo"))
root = repos.Repository("foo").fs().revision_root(0)
self.assertEqual(False, root.is_file(""))
- # TODO(jelmer): Newer versions of libsvn_repos crash when passed a nonexistant path.
- #self.assertEqual(False, root.is_file("nonexistant"))
+ # TODO(jelmer): Newer versions of libsvn_repos crash when passed a
+ # nonexistant path.
+ # self.assertEqual(False, root.is_file("nonexistant"))
class StreamTests(TestCase):
diff --git a/subvertpy/tests/test_server.py b/subvertpy/tests/test_server.py
index a0430320..80062b67 100644
--- a/subvertpy/tests/test_server.py
+++ b/subvertpy/tests/test_server.py
@@ -1,5 +1,5 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
-
+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
@@ -14,7 +14,3 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Subversion server tests."""
-
-from subvertpy.ra_svn import SVNServer
-from subvertpy.tests import SubversionTestCase
-
diff --git a/subvertpy/tests/test_wc.py b/subvertpy/tests/test_wc.py
index 08dd9d51..940cfc6e 100644
--- a/subvertpy/tests/test_wc.py
+++ b/subvertpy/tests/test_wc.py
@@ -1,5 +1,5 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
-
+
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
@@ -29,6 +29,7 @@ from subvertpy.tests import (
TestCase,
)
+
class VersionTest(TestCase):
def test_version_length(self):
@@ -65,9 +66,10 @@ class WorkingCopyTests(TestCase):
def test_match_ignore_list(self):
if wc.api_version() < (1, 5):
- self.assertRaises(NotImplementedError, wc.match_ignore_list, "foo", [])
- return # Skip test
- self.assertTrue(wc.match_ignore_list("foo", [ "f*"]))
+ self.assertRaises(
+ NotImplementedError, wc.match_ignore_list, "foo", [])
+ return # Skip test
+ self.assertTrue(wc.match_ignore_list("foo", ["f*"]))
self.assertTrue(wc.match_ignore_list("foo", ["foo"]))
self.assertFalse(wc.match_ignore_list("foo", []))
self.assertFalse(wc.match_ignore_list("foo", ["bar"]))
@@ -76,12 +78,12 @@ class WorkingCopyTests(TestCase):
class WcTests(SubversionTestCase):
def test_revision_status(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
ret = wc.revision_status("checkout")
self.assertEqual((0, 0, 0, 0), ret)
def test_revision_status_trailing(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
ret = wc.revision_status("checkout/")
self.assertEqual((0, 0, 0, 0), ret)
@@ -95,7 +97,7 @@ class AdmTests(SubversionTestCase):
"Subversion 1.7 API for WorkingCopy not yet supported")
def test_has_binary_prop(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"\x00 \x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout")
@@ -108,24 +110,27 @@ class AdmTests(SubversionTestCase):
self.build_tree({"checkout/bar": b"\x00 \x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout")
- path = os.path.join(self.test_dir, "checkout/bar")
- self.assertEqual(("%s/bar" % repos_url, 0), adm.get_ancestry("checkout/bar"))
+ self.assertEqual(("%s/bar" % repos_url, 0),
+ adm.get_ancestry("checkout/bar"))
adm.close()
def test_maybe_set_repos_root(self):
repos_url = self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout")
- adm.maybe_set_repos_root(os.path.join(self.test_dir, "checkout"), repos_url)
+ adm.maybe_set_repos_root(
+ os.path.join(self.test_dir, "checkout"), repos_url)
adm.close()
def test_add_repos_file(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout", True)
- adm.add_repos_file("checkout/bar", BytesIO(b"basecontents"), BytesIO(b"contents"), {}, {})
- self.assertEqual(b"basecontents", wc.get_pristine_contents("checkout/bar").read())
+ adm.add_repos_file("checkout/bar", BytesIO(b"basecontents"),
+ BytesIO(b"contents"), {}, {})
+ self.assertEqual(b"basecontents",
+ wc.get_pristine_contents("checkout/bar").read())
def test_mark_missing_deleted(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"\x00 \x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
@@ -134,7 +139,7 @@ class AdmTests(SubversionTestCase):
self.assertFalse(os.path.exists("checkout/bar"))
def test_remove_from_revision_control(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"\x00 \x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
@@ -142,12 +147,12 @@ class AdmTests(SubversionTestCase):
self.assertTrue(os.path.exists("checkout/bar"))
def test_relocate(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout", True)
adm.relocate("checkout", "file://", "http://")
def test_translated_stream(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"My id: $Id$"})
self.client_add('checkout/bar')
self.client_set_prop("checkout/bar", "svn:keywords", "Id\n")
@@ -158,7 +163,7 @@ class AdmTests(SubversionTestCase):
self.assertTrue(stream.read().startswith(b"My id: $Id: "))
def test_text_modified(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"My id: $Id$"})
self.client_add('checkout/bar')
self.client_set_prop("checkout/bar", "svn:keywords", "Id\n")
@@ -169,7 +174,7 @@ class AdmTests(SubversionTestCase):
self.assertTrue(adm.text_modified("checkout/bar", True))
def test_props_modified(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"My id: $Id$"})
self.client_add('checkout/bar')
self.client_set_prop("checkout/bar", "svn:keywords", "Id\n")
@@ -180,7 +185,7 @@ class AdmTests(SubversionTestCase):
self.assertTrue(adm.props_modified("checkout/bar"))
def test_prop_set(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"file"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
@@ -193,17 +198,18 @@ class AdmTests(SubversionTestCase):
if getattr(wc, "CommittedQueue", None) is None:
raise SkipTest("CommittedQueue not available")
cq = wc.CommittedQueue()
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout", True)
- adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer")
+ adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z",
+ "jelmer")
def test_entry_not_found(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout")
self.assertRaises(KeyError, adm.entry, "bar")
def test_entry(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"\x00 \x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout")
@@ -219,12 +225,12 @@ class AdmTests(SubversionTestCase):
self.assertEqual(1, entry.revision)
def test_get_actual_target(self):
- repos_url = self.make_client("repos", ".")
+ self.make_client("repos", ".")
self.assertEqual((self.test_dir, "bla"),
- wc.get_actual_target("%s/bla" % self.test_dir))
+ wc.get_actual_target("%s/bla" % self.test_dir))
def test_is_wc_root(self):
- repos_url = self.make_client("repos", ".")
+ self.make_client("repos", ".")
self.build_tree({"bar": None})
self.client_add('bar')
adm = wc.WorkingCopy(None, ".")
@@ -232,7 +238,7 @@ class AdmTests(SubversionTestCase):
self.assertFalse(adm.is_wc_root(os.path.join(self.test_dir, "bar")))
def test_status(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"text"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout")
@@ -242,10 +248,11 @@ class AdmTests(SubversionTestCase):
self.assertEqual(wc.STATUS_NORMAL, adm.status('bar').status)
def test_transmit_text_deltas(self):
- repos_url = self.make_client("repos", ".")
+ self.make_client("repos", ".")
self.build_tree({"bar": b"blala"})
self.client_add('bar')
adm = wc.WorkingCopy(None, ".", True)
+
class Editor(object):
"""Editor"""
@@ -261,7 +268,8 @@ class AdmTests(SubversionTestCase):
pass
editor = Editor()
(tmpfile, digest) = adm.transmit_text_deltas("bar", True, editor)
- self.assertEqual(editor._windows, [(0, 0, 5, 0, [(2, 0, 5)], b'blala'), None])
+ self.assertEqual(editor._windows,
+ [(0, 0, 5, 0, [(2, 0, 5)], b'blala'), None])
self.assertIsInstance(tmpfile, str)
self.assertEqual(16, len(digest))
@@ -271,7 +279,8 @@ class AdmTests(SubversionTestCase):
cq = wc.CommittedQueue()
cq.queue("bar", adm)
- adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer")
+ adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z",
+ "jelmer")
bar = adm.entry("bar")
self.assertEqual("bar", bar.name)
self.assertEqual(NODE_FILE, bar.kind)
@@ -281,20 +290,21 @@ class AdmTests(SubversionTestCase):
self.assertEqual(1, bar.revision)
def test_process_committed_queue(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"la"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
cq = wc.CommittedQueue()
cq.queue(os.path.join(self.test_dir, "checkout/bar"), adm)
- adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer")
+ adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z",
+ "jelmer")
bar = adm.entry("checkout/bar")
self.assertEqual("bar", bar.name)
self.assertEqual(NODE_FILE, bar.kind)
self.assertEqual(wc.SCHEDULE_ADD, bar.schedule)
def test_probe_try(self):
- repos_url = self.make_client("repos", "checkout")
+ self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": b"la"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
@@ -304,4 +314,6 @@ class AdmTests(SubversionTestCase):
(msg, num) = e.args
if num != subvertpy.ERR_WC_NOT_WORKING_COPY:
raise
- self.assertEqual("checkout", adm.probe_try(os.path.join("checkout", "bar")).access_path())
+ self.assertEqual(
+ "checkout",
+ adm.probe_try(os.path.join("checkout", "bar")).access_path())